/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.snapshots.transfer;

import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.ReceivableSnapshotStore;
import io.camunda.zeebe.snapshots.ReceivedSnapshot;
import io.camunda.zeebe.snapshots.SnapshotChunk;
import io.camunda.zeebe.snapshots.impl.SnapshotMetrics;
import io.camunda.zeebe.snapshots.transfer.SnapshotTransfer;
import io.camunda.zeebe.snapshots.transfer.SnapshotTransferService;
import io.camunda.zeebe.util.CloseableSilently;
import io.camunda.zeebe.util.VisibleForTesting;
import io.camunda.zeebe.util.collection.Tuple;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Function;

public class SnapshotTransferImpl
extends Actor
implements SnapshotTransfer {
    private final SnapshotTransferService service;
    private final ReceivableSnapshotStore snapshotStore;
    private final SnapshotMetrics snapshotMetrics;
    private final ConcurrentHashMap<UUID, CloseableSilently> timers = new ConcurrentHashMap();

    public SnapshotTransferImpl(Function<ConcurrencyControl, SnapshotTransferService> service, SnapshotMetrics snapshotMetrics, ReceivableSnapshotStore snapshotStore) {
        this.snapshotMetrics = snapshotMetrics;
        this.snapshotStore = snapshotStore;
        this.service = service.apply((ConcurrencyControl)this);
    }

    @Override
    public ActorFuture<PersistedSnapshot> getLatestSnapshot(int partitionId) {
        UUID transferId = UUID.randomUUID();
        this.timers.put(transferId, this.snapshotMetrics.startTransferTimer(true));
        ActorFuture result = this.service.getLatestSnapshot(partitionId, -1L, transferId).andThen(snapshot -> {
            if (snapshot == null) {
                return CompletableActorFuture.completed(null);
            }
            return this.snapshotStore.newReceivedSnapshot(snapshot.getSnapshotId()).thenApply(fbsnapshot -> {
                fbsnapshot.apply((SnapshotChunk)snapshot);
                return new Tuple(snapshot, fbsnapshot);
            });
        }, (Executor)this.actor).andThen(tuple -> {
            if (tuple == null) {
                return CompletableActorFuture.completed(null);
            }
            ActorFuture<PersistedSnapshot> future = this.receiveAllChunks(partitionId, (SnapshotChunk)tuple.getLeft(), (ReceivedSnapshot)tuple.getRight(), transferId);
            future.onError(error -> ((ReceivedSnapshot)tuple.getRight()).abort());
            return future;
        }, (Executor)this.actor);
        result.onComplete((ignored, error) -> Optional.ofNullable(this.timers.remove(transferId)).ifPresent(CloseableSilently::close));
        return result;
    }

    @VisibleForTesting
    SnapshotTransferService snapshotTransferService() {
        return this.service;
    }

    private ActorFuture<PersistedSnapshot> receiveAllChunks(int partitionId, SnapshotChunk snapshotChunk, ReceivedSnapshot receivedSnapshot, UUID transferId) {
        return this.service.getNextChunk(partitionId, receivedSnapshot.snapshotId().getSnapshotIdAsString(), snapshotChunk.getChunkName(), transferId).andThen(chunk -> {
            if (chunk != null) {
                receivedSnapshot.apply((SnapshotChunk)chunk);
                return this.receiveAllChunks(partitionId, (SnapshotChunk)chunk, receivedSnapshot, transferId);
            }
            return receivedSnapshot.persist();
        }, (Executor)this.actor);
    }
}

