/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.elasticsearch6.shaded.org.elasticsearch.indices.recovery;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
import org.apache.flink.elasticsearch6.shaded.org.apache.lucene.index.CorruptIndexException;
import org.apache.flink.elasticsearch6.shaded.org.apache.lucene.index.IndexCommit;
import org.apache.flink.elasticsearch6.shaded.org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.flink.elasticsearch6.shaded.org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.flink.elasticsearch6.shaded.org.apache.lucene.store.IOContext;
import org.apache.flink.elasticsearch6.shaded.org.apache.lucene.store.IndexInput;
import org.apache.flink.elasticsearch6.shaded.org.apache.lucene.util.ArrayUtil;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ExceptionsHelper;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.Version;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.ActionListener;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.StepListener;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.routing.ShardRouting;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.CheckedSupplier;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.StopWatch;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.bytes.BytesArray;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.collect.Tuple;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.lease.Releasable;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.logging.Loggers;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.unit.ByteSizeValue;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.unit.TimeValue;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.util.CancellableThreads;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.util.concurrent.FutureUtils;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.core.internal.io.IOUtils;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.engine.Engine;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.engine.RecoveryEngineException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.seqno.RetentionLeases;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.seqno.SequenceNumbers;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.shard.IndexShard;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.shard.IndexShardClosedException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.shard.IndexShardRelocatedException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.shard.IndexShardState;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.store.Store;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.store.StoreFileMetaData;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.translog.Translog;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.indices.recovery.DelayRecoveryException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.indices.recovery.RecoveryResponse;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.indices.recovery.RecoveryTargetHandler;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.indices.recovery.StartRecoveryRequest;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.RemoteTransportException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;

public class RecoverySourceHandler {
    protected final Logger logger;
    private final IndexShard shard;
    private final int shardId;
    private final StartRecoveryRequest request;
    private final int chunkSizeInBytes;
    private final RecoveryTargetHandler recoveryTarget;
    private final int maxConcurrentFileChunks;
    private final CancellableThreads cancellableThreads = new CancellableThreads();

    public RecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, StartRecoveryRequest request, int fileChunkSizeInBytes, int maxConcurrentFileChunks) {
        this.shard = shard;
        this.recoveryTarget = recoveryTarget;
        this.request = request;
        this.shardId = this.request.shardId().id();
        this.logger = Loggers.getLogger(this.getClass(), request.shardId(), "recover to " + request.targetNode().getName());
        this.chunkSizeInBytes = fileChunkSizeInBytes;
        this.maxConcurrentFileChunks = request.targetNode().getVersion().onOrAfter(Version.V_6_7_0) ? maxConcurrentFileChunks : 1;
    }

    public StartRecoveryRequest getRequest() {
        return this.request;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
        CopyOnWriteArrayList<Closeable> resources = new CopyOnWriteArrayList<Closeable>();
        Closeable releaseResources = () -> IOUtils.close(resources);
        ActionListener<RecoveryResponse> wrappedListener = ActionListener.notifyOnce(listener);
        try {
            SendFileResult sendFileResult;
            long startingSeqNo;
            boolean isSequenceNumberBasedRecovery;
            this.cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
                ElasticsearchException e = this.shard.state() == IndexShardState.CLOSED ? new IndexShardClosedException(this.shard.shardId(), "shard is closed and recovery was canceled reason [" + reason + "]") : new CancellableThreads.ExecutionCancelledException("recovery was canceled reason [" + reason + "]");
                if (beforeCancelEx != null) {
                    e.addSuppressed(beforeCancelEx);
                }
                IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
                throw e;
            });
            Consumer<Exception> onFailure = e -> IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure((Exception)e));
            RecoverySourceHandler.runUnderPrimaryPermit(() -> {
                IndexShardRoutingTable routingTable = this.shard.getReplicationGroup().getRoutingTable();
                ShardRouting targetShardRouting = routingTable.getByAllocationId(this.request.targetAllocationId());
                if (targetShardRouting == null) {
                    this.logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", (Object)this.request.shardId(), (Object)this.request.targetNode());
                    throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
                }
                assert (targetShardRouting.initializing()) : "expected recovery target to be initializing but was " + targetShardRouting;
            }, this.shardId + " validating recovery target [" + this.request.targetAllocationId() + "] registered ", this.shard, this.cancellableThreads, this.logger);
            Closeable retentionLock = this.shard.acquireRetentionLock();
            resources.add(retentionLock);
            boolean bl = isSequenceNumberBasedRecovery = this.request.startingSeqNo() != -2L && this.isTargetSameHistory() && this.shard.hasCompleteHistoryOperations("peer-recovery", this.request.startingSeqNo());
            if (isSequenceNumberBasedRecovery) {
                this.logger.trace("performing sequence numbers based recovery. starting at [{}]", (Object)this.request.startingSeqNo());
                startingSeqNo = this.request.startingSeqNo();
                sendFileResult = SendFileResult.EMPTY;
            } else {
                Engine.IndexCommitRef phase1Snapshot;
                try {
                    phase1Snapshot = this.shard.acquireSafeIndexCommit();
                }
                catch (Exception e2) {
                    throw new RecoveryEngineException(this.shard.shardId(), 1, "snapshot failed", e2);
                }
                startingSeqNo = this.shard.indexSettings().isSoftDeleteEnabled() ? Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get("local_checkpoint")) + 1L : 0L;
                try {
                    int estimateNumOps = this.shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
                    sendFileResult = this.phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
                }
                catch (Exception e3) {
                    try {
                        throw new RecoveryEngineException(this.shard.shardId(), 1, "phase1 failed", e3);
                    }
                    catch (Throwable throwable) {
                        try {
                            IOUtils.close(phase1Snapshot);
                            throw throwable;
                        }
                        catch (IOException ex) {
                            this.logger.warn("releasing snapshot caused exception", (Throwable)ex);
                        }
                        throw throwable;
                    }
                }
                try {
                    IOUtils.close(phase1Snapshot);
                }
                catch (IOException ex) {
                    this.logger.warn("releasing snapshot caused exception", (Throwable)ex);
                }
            }
            assert (startingSeqNo >= 0L) : "startingSeqNo must be non negative. got: " + startingSeqNo;
            StepListener<TimeValue> prepareEngineStep = new StepListener<TimeValue>();
            this.prepareTargetForTranslog(!isSequenceNumberBasedRecovery, this.shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
            StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<SendSnapshotResult>();
            prepareEngineStep.whenComplete(prepareEngineTime -> {
                RecoverySourceHandler.runUnderPrimaryPermit(() -> this.shard.initiateTracking(this.request.targetAllocationId()), this.shardId + " initiating tracking of " + this.request.targetAllocationId(), this.shard, this.cancellableThreads, this.logger);
                long endingSeqNo = this.shard.seqNoStats().getMaxSeqNo();
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("snapshot translog for recovery; current size is [{}]", (Object)this.shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
                }
                Translog.Snapshot phase2Snapshot = this.shard.getHistoryOperations("peer-recovery", startingSeqNo);
                resources.add(phase2Snapshot);
                retentionLock.close();
                long maxSeenAutoIdTimestamp = this.shard.getMaxSeenAutoIdTimestamp();
                long maxSeqNoOfUpdatesOrDeletes = this.shard.getMaxSeqNoOfUpdatesOrDeletes();
                RetentionLeases retentionLeases = this.shard.getRetentionLeases();
                this.phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, sendSnapshotStep);
                sendSnapshotStep.whenComplete(r -> IOUtils.close(phase2Snapshot), e -> {
                    IOUtils.closeWhileHandlingException(phase2Snapshot);
                    onFailure.accept(new RecoveryEngineException(this.shard.shardId(), 2, "phase2 failed", (Throwable)e));
                });
            }, onFailure);
            StepListener<Void> finalizeStep = new StepListener<Void>();
            sendSnapshotStep.whenComplete(r -> this.finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);
            finalizeStep.whenComplete(r -> {
                long phase1ThrottlingWaitTime = 0L;
                SendSnapshotResult sendSnapshotResult = (SendSnapshotResult)sendSnapshotStep.result();
                RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes, sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize, sendFileResult.existingTotalSize, sendFileResult.took.millis(), 0L, ((TimeValue)prepareEngineStep.result()).millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
                try {
                    wrappedListener.onResponse(response);
                }
                finally {
                    IOUtils.close(resources);
                }
            }, onFailure);
            return;
        }
        catch (Exception e4) {
            IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e4));
        }
    }

    private boolean isTargetSameHistory() {
        String targetHistoryUUID = this.request.metadataSnapshot().getHistoryUUID();
        assert (targetHistoryUUID != null || this.shard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) : "incoming target history N/A but index was created after or on 6.0.0-rc1";
        return targetHistoryUUID != null && targetHistoryUUID.equals(this.shard.getHistoryUUID());
    }

    static void runUnderPrimaryPermit(CancellableThreads.Interruptible runnable, String reason, IndexShard primary, CancellableThreads cancellableThreads, Logger logger) {
        cancellableThreads.execute(() -> {
            final CompletableFuture permit = new CompletableFuture();
            ActionListener<Releasable> onAcquired = new ActionListener<Releasable>(){

                @Override
                public void onResponse(Releasable releasable) {
                    if (!permit.complete(releasable)) {
                        releasable.close();
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    permit.completeExceptionally(e);
                }
            };
            primary.acquirePrimaryOperationPermit(onAcquired, "same", reason);
            try (Releasable ignored = (Releasable)FutureUtils.get(permit);){
                if (primary.isRelocatedPrimary()) {
                    throw new IndexShardRelocatedException(primary.shardId());
                }
                runnable.run();
            }
            finally {
                permit.whenComplete((r, e) -> {
                    if (r != null) {
                        r.close();
                    }
                    if (e != null) {
                        logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e);
                    }
                });
            }
        });
    }

    public SendFileResult phase1(IndexCommit snapshot, Supplier<Integer> translogOps) {
        this.cancellableThreads.checkForCancel();
        long totalSize = 0L;
        long existingTotalSize = 0L;
        ArrayList<String> phase1FileNames = new ArrayList<String>();
        ArrayList<Long> phase1FileSizes = new ArrayList<Long>();
        ArrayList<String> phase1ExistingFileNames = new ArrayList<String>();
        ArrayList<Long> phase1ExistingFileSizes = new ArrayList<Long>();
        Store store = this.shard.store();
        store.incRef();
        try {
            Store.MetadataSnapshot recoverySourceMetadata;
            StopWatch stopWatch = new StopWatch().start();
            try {
                recoverySourceMetadata = store.getMetadata(snapshot);
            }
            catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
                this.shard.failShard("recovery", ex);
                throw ex;
            }
            for (String name : snapshot.getFileNames()) {
                StoreFileMetaData md = recoverySourceMetadata.get(name);
                if (md != null) continue;
                this.logger.info("Snapshot differs from actual index for file: {} meta: {}", (Object)name, recoverySourceMetadata.asMap());
                throw new CorruptIndexException("Snapshot differs from actual index - maybe index was removed metadata has " + recoverySourceMetadata.asMap().size() + " files", name);
            }
            Version indexVersionCreated = this.shard.indexSettings().getIndexVersionCreated();
            if (!this.canSkipPhase1(indexVersionCreated, recoverySourceMetadata, this.request.metadataSnapshot())) {
                Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(this.request.metadataSnapshot());
                for (StoreFileMetaData storeFileMetaData : diff.identical) {
                    phase1ExistingFileNames.add(storeFileMetaData.name());
                    phase1ExistingFileSizes.add(storeFileMetaData.length());
                    existingTotalSize += storeFileMetaData.length();
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}], size [{}]", (Object)storeFileMetaData.name(), (Object)storeFileMetaData.checksum(), (Object)storeFileMetaData.length());
                    }
                    totalSize += storeFileMetaData.length();
                }
                ArrayList<StoreFileMetaData> phase1Files = new ArrayList<StoreFileMetaData>(diff.different.size() + diff.missing.size());
                phase1Files.addAll(diff.different);
                phase1Files.addAll(diff.missing);
                for (StoreFileMetaData md : phase1Files) {
                    if (this.request.metadataSnapshot().asMap().containsKey(md.name())) {
                        this.logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", (Object)md.name(), (Object)this.request.metadataSnapshot().asMap().get(md.name()), (Object)md);
                    } else {
                        this.logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", (Object)md.name());
                    }
                    phase1FileNames.add(md.name());
                    phase1FileSizes.add(md.length());
                    totalSize += md.length();
                }
                this.logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", (Object)phase1FileNames.size(), (Object)new ByteSizeValue(totalSize), (Object)phase1ExistingFileNames.size(), (Object)new ByteSizeValue(existingTotalSize));
                this.cancellableThreads.execute(() -> this.recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, (Integer)translogOps.get()));
                this.sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps);
                try {
                    this.cancellableThreads.executeIO(() -> this.recoveryTarget.cleanFiles((Integer)translogOps.get(), recoverySourceMetadata));
                }
                catch (IOException | RemoteTransportException exception) {
                    IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(exception);
                    if (corruptIndexException != null) {
                        try {
                            Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot);
                            StoreFileMetaData[] metadata = (StoreFileMetaData[])StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(StoreFileMetaData[]::new);
                            ArrayUtil.timSort(metadata, Comparator.comparingLong(StoreFileMetaData::length));
                            for (StoreFileMetaData md : metadata) {
                                this.cancellableThreads.checkForCancel();
                                this.logger.debug("checking integrity for file {} after remove corruption exception", (Object)md);
                                if (store.checkIntegrityNoException(md)) continue;
                                this.shard.failShard("recovery", corruptIndexException);
                                this.logger.warn("Corrupted file detected {} checksum mismatch", (Object)md);
                                throw corruptIndexException;
                            }
                        }
                        catch (IOException ex) {
                            exception.addSuppressed(ex);
                            throw exception;
                        }
                        RemoteTransportException exception2 = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
                        exception2.addSuppressed(exception);
                        this.logger.warn(() -> new ParameterizedMessage("{} Remote file corruption during finalization of recovery on node {}. local checksum OK", (Object)this.shard.shardId(), (Object)this.request.targetNode()), (Throwable)corruptIndexException);
                        throw exception2;
                    }
                    throw exception;
                }
            }
            this.logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target", (Object)recoverySourceMetadata.getSyncId());
            TimeValue took = stopWatch.totalTime();
            this.logger.trace("recovery [phase1]: took [{}]", (Object)took);
            SendFileResult sendFileResult = new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames, phase1ExistingFileSizes, existingTotalSize, took);
            return sendFileResult;
        }
        catch (Exception e) {
            throw new RecoverFilesRecoveryException(this.request.shardId(), phase1FileNames.size(), new ByteSizeValue(totalSize), e);
        }
        finally {
            store.decRef();
        }
    }

    boolean canSkipPhase1(Version indexCreatedVersion, Store.MetadataSnapshot source, Store.MetadataSnapshot target) {
        if (source.getSyncId() == null || !source.getSyncId().equals(target.getSyncId())) {
            return false;
        }
        if (source.getNumDocs() != target.getNumDocs()) {
            throw new IllegalStateException("try to recover " + this.request.shardId() + " from primary shard with sync id but number of docs differ: " + source.getNumDocs() + " (" + this.request.sourceNode().getName() + ", primary) vs " + target.getNumDocs() + "(" + this.request.targetNode().getName() + ")");
        }
        SequenceNumbers.CommitInfo sourceSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(source.getCommitUserData().entrySet());
        SequenceNumbers.CommitInfo targetSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(target.getCommitUserData().entrySet());
        if (sourceSeqNos.localCheckpoint != targetSeqNos.localCheckpoint || targetSeqNos.maxSeqNo != sourceSeqNos.maxSeqNo) {
            if (indexCreatedVersion.before(Version.V_6_0_0) && !target.getCommitUserData().containsKey("local_checkpoint") && !target.getCommitUserData().containsKey("max_seq_no")) {
                return false;
            }
            String message = "try to recover " + this.request.shardId() + " with sync id but seq_no stats are mismatched: [" + source.getCommitUserData() + "] vs [" + target.getCommitUserData() + "]";
            assert (false) : message;
            throw new IllegalStateException(message);
        }
        return true;
    }

    void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<TimeValue> listener) {
        StopWatch stopWatch = new StopWatch().start();
        ActionListener<Void> wrappedListener = ActionListener.wrap(nullVal -> {
            stopWatch.stop();
            TimeValue tookTime = stopWatch.totalTime();
            this.logger.trace("recovery [phase1]: remote engine start took [{}]", (Object)tookTime);
            listener.onResponse(tookTime);
        }, e -> listener.onFailure(new RecoveryEngineException(this.shard.shardId(), 1, "prepare target for translog failed", (Throwable)e)));
        this.logger.trace("recovery [phase1]: prepare remote engine for translog");
        this.cancellableThreads.execute(() -> this.recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, wrappedListener));
    }

    void phase2(long startingSeqNo, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases, ActionListener<SendSnapshotResult> listener) throws IOException {
        if (this.shard.state() == IndexShardState.CLOSED) {
            throw new IndexShardClosedException(this.request.shardId());
        }
        this.logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]");
        AtomicInteger skippedOps = new AtomicInteger();
        AtomicInteger totalSentOps = new AtomicInteger();
        AtomicInteger lastBatchCount = new AtomicInteger();
        CheckedSupplier<List<Translog.Operation>, IOException> readNextBatch = () -> {
            Translog.Snapshot snapshot2 = snapshot;
            synchronized (snapshot2) {
                Translog.Operation operation;
                ArrayList<Translog.Operation> ops = lastBatchCount.get() > 0 ? new ArrayList<Translog.Operation>(lastBatchCount.get()) : new ArrayList();
                long batchSizeInBytes = 0L;
                while ((operation = snapshot.next()) != null) {
                    if (this.shard.state() == IndexShardState.CLOSED) {
                        throw new IndexShardClosedException(this.request.shardId());
                    }
                    this.cancellableThreads.checkForCancel();
                    long seqNo = operation.seqNo();
                    if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
                        skippedOps.incrementAndGet();
                        continue;
                    }
                    ops.add(operation);
                    totalSentOps.incrementAndGet();
                    if ((batchSizeInBytes += operation.estimateSize()) < (long)this.chunkSizeInBytes) continue;
                    break;
                }
                lastBatchCount.set(ops.size());
                return ops;
            }
        };
        StopWatch stopWatch = new StopWatch().start();
        ActionListener<Long> batchedListener = ActionListener.wrap(targetLocalCheckpoint -> {
            assert (snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get()) : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get());
            stopWatch.stop();
            TimeValue tookTime = stopWatch.totalTime();
            this.logger.trace("recovery [phase2]: took [{}]", (Object)tookTime);
            listener.onResponse(new SendSnapshotResult((long)targetLocalCheckpoint, totalSentOps.get(), tookTime));
        }, listener::onFailure);
        this.sendBatch(readNextBatch, true, -2L, snapshot.totalOperations(), maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, batchedListener);
    }

    private void sendBatch(CheckedSupplier<List<Translog.Operation>, IOException> nextBatch, boolean firstBatch, long targetLocalCheckpoint, int totalTranslogOps, long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases, ActionListener<Long> listener) throws IOException {
        List<Translog.Operation> operations = nextBatch.get();
        if (!operations.isEmpty() || firstBatch) {
            this.cancellableThreads.execute(() -> this.recoveryTarget.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, ActionListener.wrap(newCheckpoint -> this.sendBatch(nextBatch, false, SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint), totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, listener), listener::onFailure)));
        } else {
            listener.onResponse(targetLocalCheckpoint);
        }
    }

    void finalizeRecovery(long targetLocalCheckpoint, ActionListener<Void> listener) throws IOException {
        if (this.shard.state() == IndexShardState.CLOSED) {
            throw new IndexShardClosedException(this.request.shardId());
        }
        this.cancellableThreads.checkForCancel();
        StopWatch stopWatch = new StopWatch().start();
        this.logger.trace("finalizing recovery");
        RecoverySourceHandler.runUnderPrimaryPermit(() -> this.shard.markAllocationIdAsInSync(this.request.targetAllocationId(), targetLocalCheckpoint), this.shardId + " marking " + this.request.targetAllocationId() + " as in sync", this.shard, this.cancellableThreads, this.logger);
        long globalCheckpoint = this.shard.getGlobalCheckpoint();
        StepListener<Void> finalizeListener = new StepListener<Void>();
        this.cancellableThreads.executeIO(() -> this.recoveryTarget.finalizeRecovery(globalCheckpoint, finalizeListener));
        finalizeListener.whenComplete(r -> {
            RecoverySourceHandler.runUnderPrimaryPermit(() -> this.shard.updateGlobalCheckpointForShard(this.request.targetAllocationId(), globalCheckpoint), this.shardId + " updating " + this.request.targetAllocationId() + "'s global checkpoint", this.shard, this.cancellableThreads, this.logger);
            if (this.request.isPrimaryRelocation()) {
                this.logger.trace("performing relocation hand-off");
                this.cancellableThreads.execute(() -> this.shard.relocated(this.recoveryTarget::handoffPrimaryContext));
            }
            stopWatch.stop();
            this.logger.trace("finalizing recovery took [{}]", (Object)stopWatch.totalTime());
            listener.onResponse(null);
        }, listener::onFailure);
    }

    public void cancel(String reason) {
        this.cancellableThreads.cancel(reason);
    }

    public String toString() {
        return "ShardRecoveryHandler{shardId=" + this.request.shardId() + ", sourceNode=" + this.request.sourceNode() + ", targetNode=" + this.request.targetNode() + '}';
    }

    void sendFiles(Store store, StoreFileMetaData[] files, Supplier<Integer> translogOps) throws Exception {
        ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length));
        LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(-1L, -1L);
        AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<Tuple<StoreFileMetaData, Exception>>();
        byte[] buffer = new byte[this.chunkSizeInBytes];
        block12: for (StoreFileMetaData md : files) {
            if (error.get() != null) break;
            try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);
                 InputStreamIndexInput in = new InputStreamIndexInput(indexInput, md.length());){
                int bytesRead;
                long position = 0L;
                while ((bytesRead = ((InputStream)in).read(buffer, 0, buffer.length)) != -1) {
                    BytesArray content = new BytesArray(buffer, 0, bytesRead);
                    boolean lastChunk = position + (long)content.length() == md.length();
                    long requestSeqId = requestSeqIdTracker.generateSeqNo();
                    this.cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqId - (long)this.maxConcurrentFileChunks));
                    this.cancellableThreads.checkForCancel();
                    if (error.get() != null) {
                        continue block12;
                    }
                    long requestFilePosition = position;
                    this.cancellableThreads.executeIO(() -> this.recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, (Integer)translogOps.get(), ActionListener.wrap(r -> requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId), e -> {
                        error.compareAndSet(null, Tuple.tuple(md, e));
                        requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
                    })));
                    position += (long)content.length();
                }
            }
            catch (Exception e) {
                error.compareAndSet(null, Tuple.tuple(md, e));
                break;
            }
        }
        if (error.get() == null) {
            this.cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo()));
        }
        if (error.get() != null) {
            this.handleErrorOnSendFiles(store, (StoreFileMetaData)((Tuple)error.get()).v1(), (Exception)((Tuple)error.get()).v2());
        }
    }

    private void handleErrorOnSendFiles(Store store, StoreFileMetaData md, Exception e) throws Exception {
        IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e);
        if (corruptIndexException != null) {
            if (!store.checkIntegrityNoException(md)) {
                this.logger.warn("{} Corrupted file detected {} checksum mismatch", (Object)this.shardId, (Object)md);
                this.failEngine(corruptIndexException);
                throw corruptIndexException;
            }
            RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
            exception.addSuppressed(e);
            this.logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK", new Object[]{this.shardId, this.request.targetNode(), md}), (Throwable)corruptIndexException);
            throw exception;
        }
        throw e;
    }

    protected void failEngine(IOException cause) {
        this.shard.failShard("recovery", cause);
    }

    static final class SendSnapshotResult {
        final long targetLocalCheckpoint;
        final int totalOperations;
        final TimeValue tookTime;

        SendSnapshotResult(long targetLocalCheckpoint, int totalOperations, TimeValue tookTime) {
            this.targetLocalCheckpoint = targetLocalCheckpoint;
            this.totalOperations = totalOperations;
            this.tookTime = tookTime;
        }
    }

    static final class SendFileResult {
        final List<String> phase1FileNames;
        final List<Long> phase1FileSizes;
        final long totalSize;
        final List<String> phase1ExistingFileNames;
        final List<Long> phase1ExistingFileSizes;
        final long existingTotalSize;
        final TimeValue took;
        static final SendFileResult EMPTY = new SendFileResult(Collections.emptyList(), Collections.emptyList(), 0L, Collections.emptyList(), Collections.emptyList(), 0L, TimeValue.ZERO);

        SendFileResult(List<String> phase1FileNames, List<Long> phase1FileSizes, long totalSize, List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes, long existingTotalSize, TimeValue took) {
            this.phase1FileNames = phase1FileNames;
            this.phase1FileSizes = phase1FileSizes;
            this.totalSize = totalSize;
            this.phase1ExistingFileNames = phase1ExistingFileNames;
            this.phase1ExistingFileSizes = phase1ExistingFileSizes;
            this.existingTotalSize = existingTotalSize;
            this.took = took;
        }
    }
}

