/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.snapshots.transfer;

import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshotStore;
import io.camunda.zeebe.snapshots.SnapshotChunk;
import io.camunda.zeebe.snapshots.SnapshotChunkReader;
import io.camunda.zeebe.snapshots.SnapshotException;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotChunkReader;
import io.camunda.zeebe.snapshots.transfer.SnapshotSenderService;
import io.camunda.zeebe.snapshots.transfer.SnapshotTransferService;
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.agrona.CloseHelper;

public class SnapshotTransferServiceImpl
implements SnapshotSenderService {
    private final PersistedSnapshotStore snapshotStore;
    private final Map<UUID, PendingTransfer> pendingTransfers = new HashMap<UUID, PendingTransfer>();
    private final SnapshotTransferService.TakeSnapshot takeSnapshot;
    private final int partitionId;
    private final BiConsumer<Path, Path> copyForBootstrap;
    private final ConcurrencyControl concurrency;

    public SnapshotTransferServiceImpl(PersistedSnapshotStore snapshotStore, SnapshotTransferService.TakeSnapshot takeSnapshot, int partitionId, BiConsumer<Path, Path> copyForBootstrap, ConcurrencyControl concurrency) {
        this.snapshotStore = snapshotStore;
        this.takeSnapshot = takeSnapshot;
        this.partitionId = partitionId;
        this.copyForBootstrap = copyForBootstrap;
        this.concurrency = concurrency;
    }

    @Override
    public ActorFuture<SnapshotChunk> getLatestSnapshot(int partition, long lastProcessedPosition, UUID transferId) {
        if (partition != this.partitionId) {
            return CompletableActorFuture.completedExceptionally((Throwable)new IllegalArgumentException(String.format("[%s] Invalid partition: %d. Current partition is %d", transferId, partition, this.partitionId)));
        }
        return this.getLatestSnapshotForBootstrap(lastProcessedPosition).andThen(snapshot -> {
            if (snapshot == null) {
                return CompletableActorFuture.completed(null);
            }
            String snapshotId = snapshot.getId();
            try {
                FileBasedSnapshotChunkReader reader = new FileBasedSnapshotChunkReader(snapshot.getPath());
                PendingTransfer transfer = new PendingTransfer(snapshotId, null, reader);
                SnapshotChunk chunk = transfer.next();
                if (chunk == null) {
                    return CompletableActorFuture.completedExceptionally((Throwable)new IllegalArgumentException(String.format("[%s] No snapshot chunk found", transferId)));
                }
                this.pendingTransfers.put(transferId, transfer);
                return CompletableActorFuture.completed((Object)chunk);
            }
            catch (IOException e) {
                return CompletableActorFuture.completedExceptionally((Throwable)e);
            }
        }, (Executor)this.concurrency);
    }

    @Override
    public ActorFuture<SnapshotChunk> getNextChunk(int partition, String snapshotId, String previousChunkName, UUID transferId) {
        PendingTransfer transfer = this.pendingTransfers.get(transferId);
        if (transfer != null) {
            if (!transfer.snapshotId.equals(snapshotId)) {
                return CompletableActorFuture.completedExceptionally((Throwable)new IllegalArgumentException(String.format("[%s] Invalid snapshotId: %s. Expected: %s", transferId, snapshotId, transfer.snapshotId)));
            }
            if (!transfer.lastChunkName.equals(previousChunkName)) {
                return CompletableActorFuture.completedExceptionally((Throwable)new IllegalArgumentException(String.format("[%s] Invalid previousChunkName: %s. Expected: %s", transferId, previousChunkName, transfer.lastChunkName)));
            }
            return CompletableActorFuture.completed((Object)transfer.next());
        }
        return CompletableActorFuture.completedExceptionally((Throwable)new IllegalArgumentException(String.format("[%s] No transfer found for snapshotId %s", transferId, snapshotId)));
    }

    @Override
    public ActorFuture<Void> deleteSnapshots(int partitionId) {
        if (partitionId != this.partitionId) {
            return CompletableActorFuture.completedExceptionally((Throwable)new IllegalArgumentException(String.format("Invalid partition: %d. Current partition is %d", partitionId, this.partitionId)));
        }
        return this.snapshotStore.deleteBootstrapSnapshots().thenApply(ignored -> {
            this.pendingTransfers.clear();
            return null;
        });
    }

    private ActorFuture<PersistedSnapshot> getLatestSnapshotForBootstrap(long lastProcessedPosition) {
        ActorFuture lastSnapshotFuture = this.concurrency.createFuture();
        Optional lastSnapshot = this.snapshotStore.getBootstrapSnapshot();
        if (lastSnapshot.isEmpty()) {
            this.createSnapshotForBootstrap(lastProcessedPosition).onComplete((BiConsumer)lastSnapshotFuture, (Executor)this.concurrency);
        } else {
            lastSnapshotFuture.complete((Object)((PersistedSnapshot)lastSnapshot.get()));
        }
        return lastSnapshotFuture;
    }

    private ActorFuture<PersistedSnapshot> createSnapshotForBootstrap(long lastProcessedPosition) {
        Optional<PersistedSnapshot> lastPersistedSnapshot = this.snapshotStore.getLatestSnapshot();
        CompletableActorFuture lastSnapshot = lastPersistedSnapshot.isEmpty() || lastPersistedSnapshot.get().getMetadata().processedPosition() < lastProcessedPosition ? this.takeSnapshot.takeSnapshot(lastProcessedPosition) : CompletableActorFuture.completed((Object)lastPersistedSnapshot.get());
        return lastSnapshot.andThen(persistedSnapshot -> this.withReservation((PersistedSnapshot)persistedSnapshot, () -> this.snapshotStore.copyForBootstrap((PersistedSnapshot)persistedSnapshot, this.copyForBootstrap).andThen((snapshot, error) -> {
            if (error != null) {
                if (error instanceof SnapshotException.SnapshotAlreadyExistsException) {
                    return CompletableActorFuture.completed((Object)this.snapshotStore.getBootstrapSnapshot().orElse(null));
                }
                return CompletableActorFuture.completedExceptionally((Throwable)error);
            }
            return CompletableActorFuture.completed((Object)snapshot);
        }, (Executor)this.concurrency)), (Executor)this.concurrency);
    }

    <A> ActorFuture<A> withReservation(PersistedSnapshot persistedSnapshot, Supplier<ActorFuture<A>> supplier) {
        return persistedSnapshot.reserve().andThen(reservation -> {
            try {
                return ((ActorFuture)supplier.get()).andThen((result, error) -> reservation.release().andThen(ignored -> {
                    if (error != null) {
                        return CompletableActorFuture.completedExceptionally((Throwable)error);
                    }
                    return CompletableActorFuture.completed((Object)result);
                }, (Executor)this.concurrency), (Executor)this.concurrency);
            }
            catch (Exception e) {
                reservation.release();
                return CompletableActorFuture.completedExceptionally((Throwable)e);
            }
        }, (Executor)this.concurrency);
    }

    public ActorFuture<Void> closeAsync() {
        return this.deleteSnapshots(this.partitionId);
    }

    private static class PendingTransfer {
        private final String snapshotId;
        private String lastChunkName;
        private final SnapshotChunkReader reader;

        PendingTransfer(String snapshotId, String lastChunkName, SnapshotChunkReader reader) {
            this.snapshotId = snapshotId;
            this.lastChunkName = lastChunkName;
            this.reader = reader;
        }

        private SnapshotChunk next() {
            if (this.reader != null && this.reader.hasNext()) {
                SnapshotChunk next = (SnapshotChunk)this.reader.next();
                this.lastChunkName = next.getChunkName();
                return next;
            }
            CloseHelper.close((AutoCloseable)((Object)this.reader));
            return null;
        }
    }
}

