package io.camunda.zeebe.broker.backup;

import io.atomix.cluster.MemberId;
import io.atomix.primitive.partition.PartitionId;
import io.atomix.primitive.partition.PartitionMetadata;
import io.atomix.raft.impl.LogCompactor;
import io.atomix.raft.metrics.RaftServiceMetrics;
import io.atomix.raft.partition.RaftPartition;
import io.atomix.raft.partition.RaftPartitionConfig;
import io.atomix.raft.storage.log.RaftLog;
import io.atomix.utils.concurrent.ThreadContext;
import io.camunda.zeebe.backup.management.BackupService;
import io.camunda.zeebe.broker.utils.InlineThreadContext;
import io.camunda.zeebe.journal.JournalMetaStore;
import io.camunda.zeebe.journal.file.SegmentedJournal;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.SchedulingHints;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotStore;
import io.camunda.zeebe.test.DynamicAutoCloseable;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.buffer.DirectBufferWriter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AutoClose;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:io/camunda/zeebe/broker/backup/ConcurrentBackupCompactionTest.class */
public class ConcurrentBackupCompactionTest extends DynamicAutoCloseable {
    private static final String SNAPSHOT_FILE_NAME = "file1";

    @TempDir
    Path dataDirectory;
    private ActorScheduler actorScheduler;
    private SegmentedJournal journal;
    private FileBasedSnapshotStore snapshotStore;
    private InMemoryMockBackupStore backupStore;
    private BackupService backupService;
    private LogCompactor logCompactor;

    @Mock
    private RaftLog raftLog;

    @AutoClose
    MeterRegistry meterRegistry = new SimpleMeterRegistry();
    private final int nodeId = 1;
    private final int partitionId = 1;
    private final ThreadContext threadContext = new InlineThreadContext();
    private final RaftServiceMetrics raftMetrics = new RaftServiceMetrics("1", this.meterRegistry);

    @BeforeEach
    void setUp() {
        this.actorScheduler = manage(ActorScheduler.newActorScheduler().build());
        this.actorScheduler.start();
        this.backupStore = (InMemoryMockBackupStore) manage(new InMemoryMockBackupStore());
        this.snapshotStore = manage(new FileBasedSnapshotStore(0, 1, this.dataDirectory, path -> {
            return Map.of();
        }, this.meterRegistry));
        this.actorScheduler.submitActor(this.snapshotStore, SchedulingHints.IO_BOUND);
        this.journal = manage(SegmentedJournal.builder(this.meterRegistry).withDirectory(this.dataDirectory.toFile()).withName(new RaftPartition(new PartitionMetadata(PartitionId.from("raft", 1), Set.of(), Map.of(), 1, new MemberId("1")), (RaftPartitionConfig) null, this.dataDirectory.toFile(), this.meterRegistry).name()).withMetaStore((JournalMetaStore) Mockito.mock(JournalMetaStore.class)).withMaxSegmentSize(128).build());
        this.logCompactor = new LogCompactor(this.threadContext, this.raftLog, 3, this.raftMetrics);
        Mockito.when(Boolean.valueOf(this.raftLog.deleteUntil(ArgumentMatchers.anyLong()))).thenAnswer(invocationOnMock -> {
            return Boolean.valueOf(this.journal.deleteUntil(((Long) invocationOnMock.getArgument(0)).longValue()));
        });
        this.backupService = manage(new BackupService(1, 1, 1, this.backupStore, this.snapshotStore, this.dataDirectory, j -> {
            return CompletableFuture.completedFuture(this.journal.getTailSegments(j).values());
        }, this.meterRegistry));
        this.actorScheduler.submitActor(this.backupService);
    }

    @AfterEach
    public void tearDown() {
        close();
    }

    @Test
    public void shouldNotFailWhenCompactionTriggers() throws Exception {
        appendRecord(1L, "1");
        appendRecord(2L, "2");
        PersistedSnapshot takeSnapshot = takeSnapshot(2L, 2L);
        this.logCompactor.compactFromSnapshots(this.snapshotStore);
        Awaitility.await("compaction is done").until(() -> {
            return Boolean.valueOf(this.logCompactor.getCompactableIndex() == takeSnapshot.getIndex());
        });
        appendRecord(3L, "3");
        ActorFuture takeBackup = this.backupService.takeBackup(3L, 3L);
        Awaitility.await("snapshot is reserved").until(() -> {
            return (Boolean) this.snapshotStore.getLatestSnapshot().map((v0) -> {
                return v0.isReserved();
            }).orElse(false);
        });
        appendRecord(4L, "4");
        appendRecord(5L, "5");
        takeSnapshot(4L, 5L);
        this.logCompactor.compactFromSnapshots(this.snapshotStore);
        Awaitility.await("no compaction is done").until(() -> {
            return Boolean.valueOf(this.logCompactor.getCompactableIndex() == takeSnapshot.getIndex());
        });
        Stream<Path> list = Files.list(this.dataDirectory.resolve("snapshots"));
        try {
            Assertions.assertThat(list.toList().size()).isEqualTo(4L);
            if (list != null) {
                list.close();
            }
            this.journal.getTailSegments(3L).values().forEach(path -> {
                Assertions.assertThat(path).exists();
            });
            Assertions.assertThat(takeBackup.isDone()).isFalse();
            Awaitility.await("BackStore.save is called").atMost(Duration.ofSeconds(5L)).until(() -> {
                return Boolean.valueOf(!this.backupStore.backupInProgress().isEmpty());
            });
            this.backupStore.completeSaveFutures();
            ConditionFactory atMost = Awaitility.await("backup is completed successfully").atMost(Duration.ofSeconds(5L));
            Objects.requireNonNull(takeBackup);
            atMost.until(takeBackup::isDone);
        } catch (Throwable th) {
            if (list != null) {
                try {
                    list.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void appendRecord(long j, String str) {
        this.journal.append(j, new DirectBufferWriter().wrap(new UnsafeBuffer(str.getBytes())));
    }

    private PersistedSnapshot takeSnapshot(long j, long j2) {
        TransientSnapshot transientSnapshot = (TransientSnapshot) this.snapshotStore.newTransientSnapshot(j, 1L, 1L, 1L).get();
        transientSnapshot.take(path -> {
            try {
                FileUtil.ensureDirectoryExists(path);
                Files.write(path.resolve(SNAPSHOT_FILE_NAME), "This is the content".getBytes(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
        return (PersistedSnapshot) transientSnapshot.withLastFollowupEventPosition(j2).persist().join();
    }
}
