/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.snapshots.impl;

import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.testing.ActorSchedulerRule;
import io.camunda.zeebe.snapshots.ImmutableChecksumsSFV;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshotListener;
import io.camunda.zeebe.snapshots.ReceivedSnapshot;
import io.camunda.zeebe.snapshots.SnapshotChunk;
import io.camunda.zeebe.snapshots.SnapshotChunkReader;
import io.camunda.zeebe.snapshots.SnapshotChunkWrapper;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.snapshots.impl.FileBasedReceivedSnapshot;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshot;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotStore;
import io.camunda.zeebe.snapshots.impl.SfvChecksumImpl;
import io.camunda.zeebe.snapshots.impl.SnapshotWriteException;
import io.camunda.zeebe.test.util.asserts.DirectoryAssert;
import io.camunda.zeebe.util.FileUtil;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.assertj.core.api.AbstractPathAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AtomicReferenceAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class FileBasedReceivedSnapshotTest {
    private static final String SNAPSHOT_DIRECTORY = "snapshots";
    private static final String PENDING_DIRECTORY = "pending";
    private static final int PARTITION_ID = 1;
    private static final Map<String, String> SNAPSHOT_FILE_CONTENTS = Map.of("file1", "file1 contents", "file2", "file2 contents");
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public ActorSchedulerRule scheduler = new ActorSchedulerRule();
    private FileBasedSnapshotStore senderSnapshotStore;
    private FileBasedSnapshotStore receiverSnapshotStore;
    private Path receiverPendingDir;
    private Path receiverSnapshotsDir;

    @Before
    public void beforeEach() throws Exception {
        Path receiverRoot = this.temporaryFolder.newFolder("receiver").toPath();
        this.receiverPendingDir = receiverRoot.resolve(PENDING_DIRECTORY);
        this.receiverSnapshotsDir = receiverRoot.resolve(SNAPSHOT_DIRECTORY);
        this.receiverSnapshotStore = this.createStore(receiverRoot);
        Path senderRoot = this.temporaryFolder.newFolder("sender").toPath();
        this.senderSnapshotStore = this.createStore(senderRoot);
    }

    @Test
    public void shouldNotCreatePendingDirectoryUntilFirstChunk() {
        ReceivedSnapshot receivedSnapshot = (ReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot("1-0-123-121-1").join();
        ((AbstractPathAssert)Assertions.assertThat((Path)receivedSnapshot.getPath()).as("there is no pending snapshots until a chunk is applied", new Object[0])).doesNotExist();
    }

    @Test
    public void shouldStoreReceivedSnapshotInSnapshotDirectory() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot(1L);
        ReceivedSnapshot receivedSnapshot = this.receiveSnapshot(persistedSnapshot);
        ((AbstractPathAssert)Assertions.assertThat((Path)receivedSnapshot.getPath()).as("there exists a snapshot in the directory", new Object[0])).hasParent(this.receiverSnapshotsDir).isDirectory();
    }

    @Test
    public void shouldReceiveChunk() {
        SnapshotChunk expectedChunk;
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot(1L);
        FileBasedReceivedSnapshot receivedSnapshot = (FileBasedReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        try (SnapshotChunkReader snapshotChunkReader = persistedSnapshot.newChunkReader();){
            expectedChunk = (SnapshotChunk)snapshotChunkReader.next();
            receivedSnapshot.apply(expectedChunk).join();
        }
        ((AbstractPathAssert)Assertions.assertThat((Path)receivedSnapshot.getPath()).as("the received snapshot directory contains the applied chunk", new Object[0])).isDirectoryContaining(p -> p.getFileName().toString().equals(expectedChunk.getChunkName()));
        ((AbstractPathAssert)Assertions.assertThat((Path)receivedSnapshot.getPath().resolve(expectedChunk.getChunkName())).as("the received chunk should have the expected content", new Object[0])).hasBinaryContent(expectedChunk.getContent());
    }

    @Test
    public void shouldRemovePreviousSnapshotOnCommit() {
        PersistedSnapshot firstPersistedSnapshot = this.takePersistedSnapshot(1L);
        this.receiveSnapshot(firstPersistedSnapshot).persist().join();
        PersistedSnapshot secondPersistedSnapshot = this.takePersistedSnapshot(2L);
        FileBasedSnapshot secondReceivedPersistedSnapshot = (FileBasedSnapshot)this.receiveSnapshot(secondPersistedSnapshot).persist().join();
        ((DirectoryAssert)((DirectoryAssert)Assertions.assertThat((Path)this.receiverSnapshotsDir).asInstanceOf(DirectoryAssert.factory())).as("there is only the latest snapshot in the receiver's snapshot directory", new Object[0])).isDirectoryContainingExactly(new Path[]{secondReceivedPersistedSnapshot.getPath(), secondReceivedPersistedSnapshot.getChecksumPath()});
    }

    @Test
    public void shouldNotRemovePendingSnapshotOnCommittingSnapshotWhenHigher() {
        PersistedSnapshot olderPersistedSnapshot = this.takePersistedSnapshot(1L);
        ReceivedSnapshot olderReceivedSnapshot = this.receiveSnapshot(olderPersistedSnapshot);
        PersistedSnapshot newPersistedSnapshot = this.takePersistedSnapshot(2L);
        ReceivedSnapshot receivedSnapshot = this.receiveSnapshot(newPersistedSnapshot);
        olderReceivedSnapshot.persist().join();
        ((DirectoryAssert)((DirectoryAssert)Assertions.assertThat((Path)this.receiverSnapshotsDir).asInstanceOf(DirectoryAssert.factory())).as("the latest pending snapshot should not be deleted because it is newer than the persisted one", new Object[0])).isDirectoryContainingAllOf(new Path[]{olderReceivedSnapshot.getPath(), receivedSnapshot.getPath()});
    }

    @Test
    public void shouldPersistOnPartialSnapshotOnInvalidChecksumPersist() {
        FileBasedSnapshot persistedSnapshot = (FileBasedSnapshot)this.takePersistedSnapshot(1L);
        FileBasedSnapshot corruptedSnapshot = new FileBasedSnapshot(persistedSnapshot.getDirectory(), persistedSnapshot.getChecksumPath(), (ImmutableChecksumsSFV)new SfvChecksumImpl(), persistedSnapshot.getSnapshotId(), null, s -> {}, null);
        ReceivedSnapshot receivedSnapshot = this.receiveSnapshot((PersistedSnapshot)corruptedSnapshot);
        PersistedSnapshot didPersist = (PersistedSnapshot)receivedSnapshot.persist().join();
        ((ObjectAssert)Assertions.assertThat((Object)didPersist).as("The snapshot should persist with mis-match in combined checksums", new Object[0])).isEqualTo(this.receiverSnapshotStore.getLatestSnapshot().get());
    }

    @Test
    public void shouldNotifyListenersOnNewSnapshot() {
        AtomicReference snapshotRef = new AtomicReference();
        PersistedSnapshotListener listener = snapshotRef::set;
        this.receiverSnapshotStore.addSnapshotListener(listener);
        PersistedSnapshot persistedSnapshot1 = this.takePersistedSnapshot(1L);
        PersistedSnapshot persistedSnapshot = (PersistedSnapshot)this.receiveSnapshot(persistedSnapshot1).persist().join();
        ((AtomicReferenceAssert)Assertions.assertThat(snapshotRef).as("the listener was called with the correct new snapshot reference", new Object[0])).hasValue((Object)persistedSnapshot);
    }

    @Test
    public void shouldNotNotifyListenersOnNewSnapshotWhenRemoved() {
        AtomicReference snapshotRef = new AtomicReference();
        PersistedSnapshotListener listener = snapshotRef::set;
        this.senderSnapshotStore.addSnapshotListener(listener);
        this.senderSnapshotStore.removeSnapshotListener(listener);
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot(1L);
        this.receiveSnapshot(persistedSnapshot).persist().join();
        ((AtomicReferenceAssert)Assertions.assertThat(snapshotRef).as("the listener was never called and the ref value is still null", new Object[0])).hasValue(null);
    }

    @Test
    public void shouldNotWriteChunkWithInvalidChunkChecksum() {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot(1L);
        FileBasedReceivedSnapshot receivedSnapshot = (FileBasedReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        try (SnapshotChunkReader snapshotChunkReader = persistedSnapshot.newChunkReader();){
            SnapshotChunk firstChunk = (SnapshotChunk)snapshotChunkReader.next();
            receivedSnapshot.apply(firstChunk).join();
            SnapshotChunk corruptedChunk = SnapshotChunkWrapper.withChecksum(firstChunk, 51966L);
            Assertions.assertThatCode(() -> receivedSnapshot.apply(corruptedChunk).join()).hasCauseInstanceOf(SnapshotWriteException.class).hasMessageContaining("Expected to have checksum 51966 for snapshot chunk file1");
        }
    }

    @Test
    public void shouldNotWriteChunkWithWrongTotalChunkCount() {
        SnapshotChunk firstChunk;
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot(1L);
        FileBasedReceivedSnapshot receivedSnapshot = (FileBasedReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        try (SnapshotChunkReader snapshotChunkReader = persistedSnapshot.newChunkReader();){
            firstChunk = (SnapshotChunk)snapshotChunkReader.next();
            receivedSnapshot.apply(firstChunk).join();
            SnapshotChunk corruptedChunk = SnapshotChunkWrapper.withTotalCount((SnapshotChunk)snapshotChunkReader.next(), 55);
            Assertions.assertThatThrownBy(() -> receivedSnapshot.apply(corruptedChunk).join()).hasCauseInstanceOf(SnapshotWriteException.class);
        }
        ((DirectoryAssert)((DirectoryAssert)Assertions.assertThat((Path)receivedSnapshot.getPath()).asInstanceOf(DirectoryAssert.factory())).as("the received snapshot should contain only the first chunk", new Object[0])).isDirectoryContainingExactly(new Path[]{receivedSnapshot.getPath().resolve(firstChunk.getChunkName())});
    }

    @Test
    public void shouldNotWriteChunkOnInvalidId() {
        SnapshotChunk firstChunk;
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot(1L);
        FileBasedReceivedSnapshot receivedSnapshot = (FileBasedReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        try (SnapshotChunkReader snapshotChunkReader = persistedSnapshot.newChunkReader();){
            firstChunk = (SnapshotChunk)snapshotChunkReader.next();
            receivedSnapshot.apply(firstChunk).join();
            SnapshotChunk corruptedChunk = SnapshotChunkWrapper.withSnapshotId((SnapshotChunk)snapshotChunkReader.next(), "id");
            ActorFuture future = receivedSnapshot.apply(corruptedChunk);
            Assertions.assertThatThrownBy(() -> future.get()).hasCauseInstanceOf(SnapshotWriteException.class);
        }
        ((DirectoryAssert)((DirectoryAssert)Assertions.assertThat((Path)receivedSnapshot.getPath()).asInstanceOf(DirectoryAssert.factory())).as("the received snapshot should contain only the first chunk", new Object[0])).isDirectoryContainingExactly(new Path[]{receivedSnapshot.getPath().resolve(firstChunk.getChunkName())});
    }

    @Test
    public void shouldPersistsMetadata() {
        FileBasedSnapshot snapshotToSend = (FileBasedSnapshot)this.takePersistedSnapshot(1L);
        ReceivedSnapshot receivedSnapshot = this.receiveSnapshot((PersistedSnapshot)snapshotToSend);
        PersistedSnapshot persistedSnapshot = (PersistedSnapshot)receivedSnapshot.persist().join();
        Assertions.assertThat((Object)persistedSnapshot.getMetadata()).isEqualTo((Object)snapshotToSend.getMetadata());
        ((AbstractPathAssert)Assertions.assertThat((Path)persistedSnapshot.getPath()).describedAs("Metadata file is persisted in snapshot path", new Object[0])).isDirectoryContaining(name -> name.getFileName().toString().equals("zeebe.metadata"));
    }

    @Test
    public void shouldReceiveSnapshotCorrectlyWhenFilesAreChunked() throws IOException {
        PersistedSnapshot persistedSnapshot = this.takePersistedSnapshot(1L);
        FileBasedReceivedSnapshot receivedSnapshot = (FileBasedReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        try (SnapshotChunkReader snapshotChunkReader = persistedSnapshot.newChunkReader();){
            snapshotChunkReader.setMaximumChunkSize(2);
            while (snapshotChunkReader.hasNext()) {
                receivedSnapshot.apply((SnapshotChunk)snapshotChunkReader.next()).join();
            }
        }
        receivedSnapshot.persist().join();
        try (Stream<Path> files = Files.list(receivedSnapshot.getPath());){
            files.forEach(filePath -> {
                String fileName = filePath.getFileName().toString();
                try {
                    byte[] fileBytes = Files.readAllBytes(filePath);
                    byte[] persistedFileBytes = Files.readAllBytes(persistedSnapshot.getPath().resolve(fileName));
                    Assertions.assertThat((byte[])fileBytes).isEqualTo((Object)persistedFileBytes);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        }
    }

    private ReceivedSnapshot receiveSnapshot(PersistedSnapshot persistedSnapshot) {
        FileBasedReceivedSnapshot receivedSnapshot = (FileBasedReceivedSnapshot)this.receiverSnapshotStore.newReceivedSnapshot(persistedSnapshot.getId()).join();
        try (SnapshotChunkReader snapshotChunkReader = persistedSnapshot.newChunkReader();){
            while (snapshotChunkReader.hasNext()) {
                receivedSnapshot.apply((SnapshotChunk)snapshotChunkReader.next()).join();
            }
        }
        return receivedSnapshot;
    }

    private PersistedSnapshot takePersistedSnapshot(long index) {
        TransientSnapshot transientSnapshot = (TransientSnapshot)this.senderSnapshotStore.newTransientSnapshot(index, 0L, 1L, 0L, false).get();
        transientSnapshot.take(this::writeSnapshot).join();
        return (PersistedSnapshot)transientSnapshot.withLastFollowupEventPosition(100L).persist().join();
    }

    private boolean writeSnapshot(Path path) {
        try {
            FileUtil.ensureDirectoryExists((Path)path);
            for (Map.Entry<String, String> entry : SNAPSHOT_FILE_CONTENTS.entrySet()) {
                Path fileName = path.resolve(entry.getKey());
                byte[] fileContent = entry.getValue().getBytes(StandardCharsets.UTF_8);
                Files.write(fileName, fileContent, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        return true;
    }

    private FileBasedSnapshotStore createStore(Path root) {
        FileBasedSnapshotStore store = new FileBasedSnapshotStore(0, 1, root, snapshotPath -> Map.of(), (MeterRegistry)new SimpleMeterRegistry());
        this.scheduler.submitActor((Actor)store);
        return store;
    }
}

