/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.snapshots;

import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.testing.ActorSchedulerRule;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.ReceivableSnapshotStore;
import io.camunda.zeebe.snapshots.ReceivedSnapshot;
import io.camunda.zeebe.snapshots.SnapshotChunk;
import io.camunda.zeebe.snapshots.SnapshotChunkReader;
import io.camunda.zeebe.snapshots.SnapshotChunkWrapper;
import io.camunda.zeebe.snapshots.SnapshotException;
import io.camunda.zeebe.snapshots.SnapshotTransferUtil;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotStore;
import io.camunda.zeebe.snapshots.impl.SnapshotWriteException;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractPathAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.OptionalAssert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.AutoClose;
import org.junit.rules.TemporaryFolder;

public class ReceivedSnapshotTest {
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public ActorSchedulerRule scheduler = new ActorSchedulerRule();
    @AutoClose
    ConstructableSnapshotStore senderSnapshotStore;
    @AutoClose
    ReceivableSnapshotStore receiverSnapshotStore;

    @Before
    public void beforeEach() throws Exception {
        boolean partitionId = true;
        Path senderDirectory = this.temporaryFolder.newFolder("sender").toPath();
        Path receiverDirectory = this.temporaryFolder.newFolder("receiver").toPath();
        this.senderSnapshotStore = new FileBasedSnapshotStore(0, 1, senderDirectory, snapshotPath -> Map.of(), (MeterRegistry)new SimpleMeterRegistry());
        this.scheduler.get().submitActor((Actor)this.senderSnapshotStore).join();
        this.receiverSnapshotStore = new FileBasedSnapshotStore(0, 1, receiverDirectory, snapshotPath -> Map.of(), (MeterRegistry)new SimpleMeterRegistry());
        this.scheduler.get().submitActor((Actor)this.receiverSnapshotStore).join();
    }

    @Test
    public void shouldThrowExceptionOnInvalidSnapshotId() {
        Assertions.assertThatThrownBy(() -> this.receiverSnapshotStore.newReceivedSnapshot("invalid").join()).isInstanceOf(IllegalStateException.class);
    }

    @Test
    public void shouldReturnIndexOfSnapshot() {
        ReceivedSnapshot receivedSnapshot = (ReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot("1-0-123-121-1").join();
        Assertions.assertThat((long)receivedSnapshot.index()).isEqualTo(1L);
    }

    @Test
    public void shouldNotCommitUntilPersisted() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        ReceivedSnapshot receivedSnapshot = this.receiveSnapshot(persistedSnapshot);
        ((AbstractPathAssert)Assertions.assertThat((Path)receivedSnapshot.getPath()).as("there exists a pending snapshot", new Object[0])).isDirectory();
        ((OptionalAssert)Assertions.assertThat((Optional)this.receiverSnapshotStore.getLatestSnapshot()).as("the pending snapshot was not committed", new Object[0])).isEmpty();
    }

    @Test
    public void shouldReceiveSnapshot() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        PersistedSnapshot receivedSnapshot = (PersistedSnapshot)this.receiveSnapshot(persistedSnapshot).persist().join();
        ((OptionalAssert)Assertions.assertThat((Optional)this.receiverSnapshotStore.getLatestSnapshot()).as("the received snapshot was committed and is the latest snapshot", new Object[0])).hasValue((Object)receivedSnapshot);
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)receivedSnapshot.getChecksums().sameChecksums(persistedSnapshot.getChecksums())).as("the received snapshot has the same checksum as the sent snapshot", new Object[0])).isTrue();
        ((AbstractStringAssert)Assertions.assertThat((String)receivedSnapshot.getId()).as("the received snapshot has the same ID as the sent snapshot", new Object[0])).isEqualTo(persistedSnapshot.getId());
    }

    @Test
    public void shouldDeletePendingSnapshotDirOnAbort() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        ReceivedSnapshot receivedSnapshot = this.receiveSnapshot(persistedSnapshot);
        receivedSnapshot.abort().join();
        ((AbstractPathAssert)Assertions.assertThat((Path)receivedSnapshot.getPath()).as("the pending snapshot does not exist anymore after purging", new Object[0])).doesNotExist();
    }

    @Test
    public void shouldNotDeletePersistedSnapshotOnPurgePendingOnStore() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        PersistedSnapshot receivedSnapshot = (PersistedSnapshot)this.receiveSnapshot(persistedSnapshot).persist().join();
        this.receiverSnapshotStore.abortPendingSnapshots().join();
        ((AbstractPathAssert)Assertions.assertThat((Path)receivedSnapshot.getPath()).as("the received snapshot still exists", new Object[0])).exists();
        ((ObjectAssert)Assertions.assertThat((Object)receivedSnapshot).as("the previous snapshot should still be the latest snapshot", new Object[0])).isEqualTo(this.receiverSnapshotStore.getLatestSnapshot().orElseThrow());
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)receivedSnapshot.getChecksums().sameChecksums(persistedSnapshot.getChecksums())).as("the received snapshot still has the same checksum", new Object[0])).isTrue();
    }

    @Test
    public void shouldPersistEvenIfSameChunkIsConsumedMultipleTimes() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        ReceivedSnapshot receivedSnapshot = (ReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        try (SnapshotChunkReader snapshotChunkReader = persistedSnapshot.newChunkReader();){
            SnapshotChunk chunk = (SnapshotChunk)snapshotChunkReader.next();
            receivedSnapshot.apply(chunk).join();
            receivedSnapshot.apply(chunk).join();
            while (snapshotChunkReader.hasNext()) {
                receivedSnapshot.apply((SnapshotChunk)snapshotChunkReader.next()).join();
            }
        }
        PersistedSnapshot receivedPersistedSnapshot = (PersistedSnapshot)receivedSnapshot.persist().join();
        ((ObjectAssert)Assertions.assertThat((Object)receivedPersistedSnapshot).as("the snapshot was persisted even if one chunk was applied more than once", new Object[0])).isEqualTo(this.receiverSnapshotStore.getLatestSnapshot().orElseThrow());
    }

    @Test
    public void shouldThrowWhenAlreadyExistingSnapshotWasReceivedAgain() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        ReceivedSnapshot firstReceivedSnapshot = this.receiveSnapshot(persistedSnapshot);
        PersistedSnapshot firstPersistedSnapshot = (PersistedSnapshot)firstReceivedSnapshot.persist().join();
        Assertions.assertThat((Future)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId())).failsWithin(Duration.ofMillis(100L)).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(SnapshotException.SnapshotAlreadyExistsException.class).withMessageMatching("Expected to receive snapshot with id 1-0-1-0-0-\\S+, but was already persisted. This shouldn't happen.");
    }

    @Test
    public void shouldThrowExceptionOnPersistWhenNoChunkApplied() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        ReceivedSnapshot receivedSnapshot = (ReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        Assertions.assertThatThrownBy(() -> receivedSnapshot.persist().join()).hasCauseInstanceOf(NullPointerException.class);
    }

    @Test
    public void shouldBeAbleToAbortAfterPersistFails() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        ReceivedSnapshot receivedSnapshot = (ReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        try (SnapshotChunkReader snapshotChunkReader = persistedSnapshot.newChunkReader();){
            receivedSnapshot.apply((SnapshotChunk)snapshotChunkReader.next()).join();
        }
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> receivedSnapshot.persist().join()).as("the received snapshot is partial and should not be persisted", new Object[0])).hasCauseInstanceOf(IllegalStateException.class);
        receivedSnapshot.abort().join();
        ((AbstractPathAssert)Assertions.assertThat((Path)receivedSnapshot.getPath()).as("the corrupted pending snapshot was deleted on abort", new Object[0])).doesNotExist();
    }

    @Test
    public void shouldNotThrowExceptionOnAbortWhenNoChunkApplied() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        ReceivedSnapshot receivedSnapshot = (ReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        Assertions.assertThatCode(() -> ((ReceivedSnapshot)receivedSnapshot).abort()).doesNotThrowAnyException();
    }

    @Test
    public void shouldReceiveConcurrentlyAndPersist() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        ReceivedSnapshot firstReceivedSnapshot = this.receiveSnapshot(persistedSnapshot);
        ReceivedSnapshot secondReceivedSnapshot = this.receiveSnapshot(persistedSnapshot);
        PersistedSnapshot receivedPersistedSnapshot = (PersistedSnapshot)firstReceivedSnapshot.persist().join();
        ((AbstractPathAssert)Assertions.assertThat((Path)firstReceivedSnapshot.getPath()).as("the first received snapshot was removed but the new was persist on the same directory", new Object[0])).exists();
        ((AbstractPathAssert)Assertions.assertThat((Path)secondReceivedSnapshot.getPath()).as("the second received snapshot was not removed as it's not considered older", new Object[0])).exists();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)receivedPersistedSnapshot.getChecksums().sameChecksums(persistedSnapshot.getChecksums())).as("the received, persisted snapshot have the same checksum as the persisted one", new Object[0])).isTrue();
    }

    @Test
    public void shouldDoNothingOnPersistOfAlreadyCommittedSnapshot() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        ReceivedSnapshot receivedSnapshot = this.receiveSnapshot(persistedSnapshot);
        ReceivedSnapshot otherReceivedSnapshot = this.receiveSnapshot(persistedSnapshot);
        PersistedSnapshot otherReceivedPersisted = (PersistedSnapshot)otherReceivedSnapshot.persist().join();
        PersistedSnapshot receivedPersisted = (PersistedSnapshot)receivedSnapshot.persist().join();
        ((ObjectAssert)Assertions.assertThat((Object)receivedPersisted).as("the last persisted snapshot is the same as the first one as they have the same ID", new Object[0])).isEqualTo((Object)otherReceivedPersisted);
        ((AbstractPathAssert)Assertions.assertThat((Path)receivedSnapshot.getPath()).as("the received snapshot was removed on persist of the other snapshot", new Object[0])).doesNotExist();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)receivedPersisted.getChecksums().sameChecksums(persistedSnapshot.getChecksums())).as("the received, persisted snapshot have the same checksum as the persisted one", new Object[0])).isTrue();
    }

    @Test
    public void shouldCompleteExceptionallyOnConsumingChunkWithInvalidChunkChecksum() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        ReceivedSnapshot receivedSnapshot = (ReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        try (SnapshotChunkReader snapshotChunkReader = persistedSnapshot.newChunkReader();){
            SnapshotChunk originalChunk = (SnapshotChunk)snapshotChunkReader.next();
            SnapshotChunk corruptedChunk = SnapshotChunkWrapper.withChecksum(originalChunk, 51966L);
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> receivedSnapshot.apply(corruptedChunk).join()).as("the chunk should not be applied as its content checksum is not 0xCAFEL", new Object[0])).hasCauseInstanceOf(SnapshotWriteException.class);
        }
    }

    @Test
    public void shouldNotPersistWhenSnapshotIsPartial() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        ReceivedSnapshot receivedSnapshot = (ReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        try (SnapshotChunkReader snapshotChunkReader = persistedSnapshot.newChunkReader();){
            SnapshotChunk firstChunk = (SnapshotChunk)snapshotChunkReader.next();
            receivedSnapshot.apply(firstChunk).join();
        }
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> receivedSnapshot.persist().join()).as("the snapshot should be persisted as it's corrupted due to a missing chunk", new Object[0])).hasCauseInstanceOf(IllegalStateException.class);
    }

    @Test
    public void shouldCompleteExceptionallyOnConsumingChunkWithNotEqualTotalCount() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        ReceivedSnapshot receivedSnapshot = (ReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        try (SnapshotChunkReader snapshotChunkReader = persistedSnapshot.newChunkReader();){
            SnapshotChunk firstChunk = (SnapshotChunk)snapshotChunkReader.next();
            receivedSnapshot.apply(firstChunk).join();
            SnapshotChunk corruptedChunk = SnapshotChunkWrapper.withTotalCount((SnapshotChunk)snapshotChunkReader.next(), 55);
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> receivedSnapshot.apply(corruptedChunk).join()).as("the second chunk should not be applied as it reports a different chunk count", new Object[0])).hasCauseInstanceOf(SnapshotWriteException.class);
        }
    }

    @Test
    public void shouldNotPersistWhenTotalCountIsWrong() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        ReceivedSnapshot receivedSnapshot = (ReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        try (SnapshotChunkReader snapshotChunkReader = persistedSnapshot.newChunkReader();){
            while (snapshotChunkReader.hasNext()) {
                SnapshotChunk corruptedChunk = SnapshotChunkWrapper.withTotalCount((SnapshotChunk)snapshotChunkReader.next(), 1);
                receivedSnapshot.apply(corruptedChunk).join();
            }
        }
        ActorFuture persisted = receivedSnapshot.persist();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((ActorFuture)persisted).join()).as("the snapshot should not be persisted as it's corrupted due to missing chunks, even if the total count was present", new Object[0])).hasCauseInstanceOf(IllegalStateException.class);
    }

    @Test
    public void shouldCompleteExceptionallyOnConsumingChunkWithNotEqualSnapshotId() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot();
        ReceivedSnapshot receivedSnapshot = (ReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        try (SnapshotChunkReader snapshotChunkReader = persistedSnapshot.newChunkReader();){
            SnapshotChunk originalChunk = (SnapshotChunk)snapshotChunkReader.next();
            SnapshotChunk corruptedChunk = SnapshotChunkWrapper.withSnapshotId(originalChunk, "id");
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> receivedSnapshot.apply(corruptedChunk).join()).as("the chunk should not be applied since it has a different snapshot ID than expected", new Object[0])).hasCauseInstanceOf(SnapshotWriteException.class);
        }
    }

    private ReceivedSnapshot receiveSnapshot(PersistedSnapshot persistedSnapshot) {
        ReceivedSnapshot receivedSnapshot = (ReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        try (SnapshotChunkReader snapshotChunkReader = persistedSnapshot.newChunkReader();){
            while (snapshotChunkReader.hasNext()) {
                receivedSnapshot.apply((SnapshotChunk)snapshotChunkReader.next()).join();
            }
        }
        return receivedSnapshot;
    }

    private PersistedSnapshot takePersistedSnapshot() {
        TransientSnapshot transientSnapshot = (TransientSnapshot)this.senderSnapshotStore.newTransientSnapshot(1L, 0L, 1L, 0L, false).get();
        transientSnapshot.take(p -> SnapshotTransferUtil.writeSnapshot(p, SnapshotTransferUtil.SNAPSHOT_FILE_CONTENTS)).join();
        return (PersistedSnapshot)transientSnapshot.persist().join();
    }
}

