/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.repair;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import net.nmoncho.shaded.com.google.common.annotations.VisibleForTesting;
import net.nmoncho.shaded.com.google.common.base.Preconditions;
import net.nmoncho.shaded.com.google.common.collect.ImmutableMap;
import net.nmoncho.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.AsymmetricRemoteSyncTask;
import org.apache.cassandra.repair.CompletableRemoteSyncTask;
import org.apache.cassandra.repair.LocalSyncTask;
import org.apache.cassandra.repair.NoSuchRepairSessionException;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.RepairResult;
import org.apache.cassandra.repair.RepairSession;
import org.apache.cassandra.repair.SnapshotTask;
import org.apache.cassandra.repair.SymmetricRemoteSyncTask;
import org.apache.cassandra.repair.SyncStat;
import org.apache.cassandra.repair.SyncTask;
import org.apache.cassandra.repair.TreeResponse;
import org.apache.cassandra.repair.ValidationTask;
import org.apache.cassandra.repair.asymmetric.DifferenceHolder;
import org.apache.cassandra.repair.asymmetric.HostDifferences;
import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter;
import org.apache.cassandra.repair.asymmetric.ReduceHelper;
import org.apache.cassandra.repair.state.JobState;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.paxos.Paxos;
import org.apache.cassandra.service.paxos.cleanup.PaxosCleanup;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTrees;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RepairJob
extends AsyncFuture<RepairResult>
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(RepairJob.class);
    public final JobState state;
    private final RepairJobDesc desc;
    private final RepairSession session;
    private final RepairParallelism parallelismDegree;
    private final ExecutorPlus taskExecutor;
    @VisibleForTesting
    final List<ValidationTask> validationTasks = new CopyOnWriteArrayList<ValidationTask>();
    @VisibleForTesting
    final List<SyncTask> syncTasks = new CopyOnWriteArrayList<SyncTask>();

    public RepairJob(RepairSession session, String columnFamily) {
        this.session = session;
        this.taskExecutor = session.taskExecutor;
        this.parallelismDegree = session.parallelismDegree;
        this.desc = new RepairJobDesc(session.state.parentRepairSession, session.getId(), session.state.keyspace, columnFamily, session.state.commonRange.ranges);
        this.state = new JobState(this.desc, session.state.commonRange.endpoints);
    }

    public int getNowInSeconds() {
        int nowInSeconds = FBUtilities.nowInSeconds();
        if (this.session.previewKind == PreviewKind.REPAIRED) {
            return nowInSeconds + DatabaseDescriptor.getValidationPreviewPurgeHeadStartInSec();
        }
        return nowInSeconds;
    }

    @Override
    public void run() {
        AsyncFuture paxosRepair;
        this.state.phase.start();
        Keyspace ks = Keyspace.open(this.desc.keyspace);
        final ColumnFamilyStore cfs = ks.getColumnFamilyStore(this.desc.columnFamily);
        cfs.metric.repairsStarted.inc();
        ArrayList<InetAddressAndPort> allEndpoints = new ArrayList<InetAddressAndPort>(this.session.state.commonRange.endpoints);
        allEndpoints.add(FBUtilities.getBroadcastAddressAndPort());
        if (DatabaseDescriptor.paxosRepairEnabled() && (Paxos.useV2() && this.session.repairPaxos || this.session.paxosOnly)) {
            logger.info("{} {}.{} starting paxos repair", new Object[]{this.session.previewKind.logPrefix(this.session.getId()), this.desc.keyspace, this.desc.columnFamily});
            TableMetadata metadata = Schema.instance.getTableMetadata(this.desc.keyspace, this.desc.columnFamily);
            paxosRepair = PaxosCleanup.cleanup(allEndpoints, metadata, this.desc.ranges, this.session.state.commonRange.hasSkippedReplicas, this.taskExecutor);
        } else {
            logger.info("{} {}.{} not running paxos repair", new Object[]{this.session.previewKind.logPrefix(this.session.getId()), this.desc.keyspace, this.desc.columnFamily});
            paxosRepair = ImmediateFuture.success(null);
        }
        if (this.session.paxosOnly) {
            paxosRepair.addCallback((FutureCallback<Object>)new FutureCallback<Void>(){

                @Override
                public void onSuccess(Void v) {
                    logger.info("{} {}.{} paxos repair completed", new Object[]{((RepairJob)RepairJob.this).session.previewKind.logPrefix(RepairJob.this.session.getId()), ((RepairJob)RepairJob.this).desc.keyspace, ((RepairJob)RepairJob.this).desc.columnFamily});
                    RepairJob.this.trySuccess(new RepairResult(RepairJob.this.desc, Collections.emptyList()));
                }

                @Override
                public void onFailure(Throwable t) {
                    logger.warn("{} {}.{} paxos repair failed", new Object[]{((RepairJob)RepairJob.this).session.previewKind.logPrefix(RepairJob.this.session.getId()), ((RepairJob)RepairJob.this).desc.keyspace, ((RepairJob)RepairJob.this).desc.columnFamily});
                    RepairJob.this.tryFailure(t);
                }
            }, (Executor)this.taskExecutor);
            return;
        }
        Future<List<Object>> allSnapshotTasks = this.parallelismDegree != RepairParallelism.PARALLEL ? (this.session.isIncremental ? paxosRepair.map(input -> allEndpoints) : paxosRepair.flatMap(input -> {
            ArrayList<SnapshotTask> snapshotTasks = new ArrayList<SnapshotTask>(allEndpoints.size());
            this.state.phase.snapshotsSubmitted();
            for (InetAddressAndPort endpoint : allEndpoints) {
                SnapshotTask snapshotTask = new SnapshotTask(this.desc, endpoint);
                snapshotTasks.add(snapshotTask);
                this.taskExecutor.execute(snapshotTask);
            }
            return FutureCombiner.allOf(snapshotTasks).map(a -> {
                this.state.phase.snapshotsCompleted();
                return a;
            });
        })) : null;
        Future syncResults = this.session.validationScheduler.schedule(() -> this.createSyncTasks(paxosRepair, allSnapshotTasks, allEndpoints), (Executor)this.taskExecutor).flatMap(this::executeTasks, this.taskExecutor);
        syncResults.addCallback(new FutureCallback<List<SyncStat>>(){

            @Override
            public void onSuccess(List<SyncStat> stats) {
                RepairJob.this.state.phase.success();
                if (!((RepairJob)RepairJob.this).session.previewKind.isPreview()) {
                    logger.info("{} {}.{} is fully synced", new Object[]{((RepairJob)RepairJob.this).session.previewKind.logPrefix(RepairJob.this.session.getId()), ((RepairJob)RepairJob.this).desc.keyspace, ((RepairJob)RepairJob.this).desc.columnFamily});
                    SystemDistributedKeyspace.successfulRepairJob(RepairJob.this.session.getId(), ((RepairJob)RepairJob.this).desc.keyspace, ((RepairJob)RepairJob.this).desc.columnFamily);
                }
                cfs.metric.repairsCompleted.inc();
                RepairJob.this.trySuccess(new RepairResult(RepairJob.this.desc, stats));
            }

            @Override
            public void onFailure(Throwable t) {
                RepairJob.this.state.phase.fail(t);
                RepairJob.this.validationTasks.forEach(ValidationTask::abort);
                RepairJob.this.syncTasks.forEach(SyncTask::abort);
                if (!((RepairJob)RepairJob.this).session.previewKind.isPreview()) {
                    logger.warn("{} {}.{} sync failed", new Object[]{((RepairJob)RepairJob.this).session.previewKind.logPrefix(RepairJob.this.session.getId()), ((RepairJob)RepairJob.this).desc.keyspace, ((RepairJob)RepairJob.this).desc.columnFamily});
                    SystemDistributedKeyspace.failedRepairJob(RepairJob.this.session.getId(), ((RepairJob)RepairJob.this).desc.keyspace, ((RepairJob)RepairJob.this).desc.columnFamily, t);
                }
                cfs.metric.repairsCompleted.inc();
                RepairJob.this.tryFailure(t instanceof NoSuchRepairSessionExceptionWrapper ? ((NoSuchRepairSessionExceptionWrapper)t).wrapped : t);
            }
        }, (Executor)this.taskExecutor);
    }

    private Future<List<SyncTask>> createSyncTasks(Future<Void> paxosRepair, Future<?> allSnapshotTasks, List<InetAddressAndPort> allEndpoints) {
        Future<Object> treeResponses = allSnapshotTasks != null ? allSnapshotTasks.flatMap(endpoints -> {
            if (this.parallelismDegree == RepairParallelism.SEQUENTIAL) {
                return this.sendSequentialValidationRequest(allEndpoints);
            }
            return this.sendDCAwareValidationRequest(allEndpoints);
        }, this.taskExecutor) : paxosRepair.flatMap(input -> this.sendValidationRequest(allEndpoints));
        treeResponses = treeResponses.map(a -> {
            this.state.phase.validationCompleted();
            return a;
        });
        return treeResponses.map(this.session.optimiseStreams && !this.session.pullRepair ? this::createOptimisedSyncingSyncTasks : this::createStandardSyncTasks, this.taskExecutor);
    }

    private boolean isTransient(InetAddressAndPort ep) {
        return this.session.state.commonRange.transEndpoints.contains(ep);
    }

    private List<SyncTask> createStandardSyncTasks(List<TreeResponse> trees) {
        return RepairJob.createStandardSyncTasks(this.desc, trees, FBUtilities.getLocalAddressAndPort(), this::isTransient, this.session.isIncremental, this.session.pullRepair, this.session.previewKind);
    }

    @VisibleForTesting
    static List<SyncTask> createStandardSyncTasks(RepairJobDesc desc, List<TreeResponse> trees, InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, boolean isIncremental, boolean pullRepair, PreviewKind previewKind) {
        long startedAt = Clock.Global.currentTimeMillis();
        ArrayList<SyncTask> syncTasks = new ArrayList<SyncTask>();
        for (int i = 0; i < trees.size() - 1; ++i) {
            TreeResponse r1 = trees.get(i);
            for (int j = i + 1; j < trees.size(); ++j) {
                SyncTask task;
                List<Range<Token>> differences;
                TreeResponse r2 = trees.get(j);
                if (isTransient.test(r1.endpoint) && isTransient.test(r2.endpoint) || (differences = MerkleTrees.difference(r1.trees, r2.trees)).isEmpty()) continue;
                if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) {
                    boolean transferRanges;
                    TreeResponse self = r1.endpoint.equals(local) ? r1 : r2;
                    TreeResponse remote = r2.endpoint.equals(local) ? r1 : r2;
                    boolean requestRanges = !isTransient.test(self.endpoint);
                    boolean bl = transferRanges = !isTransient.test(remote.endpoint) && !pullRepair;
                    if (!requestRanges && !transferRanges) continue;
                    task = new LocalSyncTask(desc, self.endpoint, remote.endpoint, differences, isIncremental ? desc.parentSessionId : null, requestRanges, transferRanges, previewKind);
                } else if (isTransient.test(r1.endpoint) || isTransient.test(r2.endpoint)) {
                    TreeResponse streamFrom = isTransient.test(r1.endpoint) ? r1 : r2;
                    TreeResponse streamTo = isTransient.test(r1.endpoint) ? r2 : r1;
                    task = new AsymmetricRemoteSyncTask(desc, streamTo.endpoint, streamFrom.endpoint, differences, previewKind);
                } else {
                    task = new SymmetricRemoteSyncTask(desc, r1.endpoint, r2.endpoint, differences, previewKind);
                }
                syncTasks.add(task);
            }
            trees.get((int)i).trees.release();
        }
        trees.get((int)(trees.size() - 1)).trees.release();
        logger.info("Created {} sync tasks based on {} merkle tree responses for {} (took: {}ms)", new Object[]{syncTasks.size(), trees.size(), desc.parentSessionId, Clock.Global.currentTimeMillis() - startedAt});
        return syncTasks;
    }

    @VisibleForTesting
    Future<List<SyncStat>> executeTasks(List<SyncTask> tasks) {
        try {
            ActiveRepairService.instance.getParentRepairSession(this.desc.parentSessionId);
            this.syncTasks.addAll(tasks);
            for (SyncTask task : tasks) {
                if (!task.isLocal()) {
                    this.session.trackSyncCompletion(Pair.create(this.desc, task.nodePair()), (CompletableRemoteSyncTask)((Object)task));
                }
                this.taskExecutor.execute(task);
            }
            return FutureCombiner.allOf(tasks);
        }
        catch (NoSuchRepairSessionException e) {
            return ImmediateFuture.failure(new NoSuchRepairSessionExceptionWrapper(e));
        }
    }

    private List<SyncTask> createOptimisedSyncingSyncTasks(List<TreeResponse> trees) {
        return RepairJob.createOptimisedSyncingSyncTasks(this.desc, trees, FBUtilities.getLocalAddressAndPort(), this::isTransient, this::getDC, this.session.isIncremental, this.session.previewKind);
    }

    static List<SyncTask> createOptimisedSyncingSyncTasks(RepairJobDesc desc, List<TreeResponse> trees, InetAddressAndPort local, Predicate<InetAddressAndPort> isTransient, Function<InetAddressAndPort, String> getDC, boolean isIncremental, PreviewKind previewKind) {
        long startedAt = Clock.Global.currentTimeMillis();
        ArrayList<SyncTask> syncTasks = new ArrayList<SyncTask>();
        DifferenceHolder diffHolder = new DifferenceHolder(trees);
        logger.trace("diffs = {}", (Object)diffHolder);
        PreferedNodeFilter preferSameDCFilter = (streaming, candidates) -> candidates.stream().filter(node -> ((String)getDC.apply(streaming)).equals(getDC.apply((InetAddressAndPort)node))).collect(Collectors.toSet());
        ImmutableMap<InetAddressAndPort, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter);
        for (int i = 0; i < trees.size(); ++i) {
            InetAddressAndPort address = trees.get((int)i).endpoint;
            if (isTransient.test(address)) continue;
            HostDifferences streamsFor = reducedDifferences.get(address);
            if (streamsFor != null) {
                Preconditions.checkArgument(streamsFor.get(address).isEmpty(), "We should not fetch ranges from ourselves");
                for (InetAddressAndPort fetchFrom : streamsFor.hosts()) {
                    ArrayList<Range<Token>> toFetch = new ArrayList<Range<Token>>(streamsFor.get(fetchFrom));
                    assert (!toFetch.isEmpty());
                    logger.trace("{} is about to fetch {} from {}", new Object[]{address, toFetch, fetchFrom});
                    SyncTask task = address.equals(local) ? new LocalSyncTask(desc, address, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null, true, false, previewKind) : new AsymmetricRemoteSyncTask(desc, address, fetchFrom, toFetch, previewKind);
                    syncTasks.add(task);
                }
                continue;
            }
            logger.trace("Node {} has nothing to stream", (Object)address);
        }
        logger.info("Created {} optimised sync tasks based on {} merkle tree responses for {} (took: {}ms)", new Object[]{syncTasks.size(), trees.size(), desc.parentSessionId, Clock.Global.currentTimeMillis() - startedAt});
        logger.trace("Optimised sync tasks for {}: {}", (Object)desc.parentSessionId, syncTasks);
        return syncTasks;
    }

    private String getDC(InetAddressAndPort address) {
        return DatabaseDescriptor.getEndpointSnitch().getDatacenter(address);
    }

    private Future<List<TreeResponse>> sendValidationRequest(Collection<InetAddressAndPort> endpoints) {
        this.state.phase.validationSubmitted();
        String message = String.format("Requesting merkle trees for %s (to %s)", this.desc.columnFamily, endpoints);
        logger.info("{} {}", (Object)this.session.previewKind.logPrefix(this.desc.sessionId), (Object)message);
        Tracing.traceRepair(message, new Object[0]);
        int nowInSec = this.getNowInSeconds();
        ArrayList<ValidationTask> tasks = new ArrayList<ValidationTask>(endpoints.size());
        for (InetAddressAndPort endpoint : endpoints) {
            ValidationTask task = this.newValidationTask(endpoint, nowInSec);
            tasks.add(task);
            this.session.trackValidationCompletion(Pair.create(this.desc, endpoint), task);
            this.taskExecutor.execute(task);
        }
        return FutureCombiner.allOf(tasks);
    }

    private Future<List<TreeResponse>> sendSequentialValidationRequest(Collection<InetAddressAndPort> endpoints) {
        this.state.phase.validationSubmitted();
        String message = String.format("Requesting merkle trees for %s (to %s)", this.desc.columnFamily, endpoints);
        logger.info("{} {}", (Object)this.session.previewKind.logPrefix(this.desc.sessionId), (Object)message);
        Tracing.traceRepair(message, new Object[0]);
        int nowInSec = this.getNowInSeconds();
        ArrayList<ValidationTask> tasks = new ArrayList<ValidationTask>(endpoints.size());
        LinkedList<InetAddressAndPort> requests = new LinkedList<InetAddressAndPort>(endpoints);
        InetAddressAndPort address = (InetAddressAndPort)requests.poll();
        ValidationTask firstTask = this.newValidationTask(address, nowInSec);
        logger.info("{} Validating {}", (Object)this.session.previewKind.logPrefix(this.desc.sessionId), (Object)address);
        this.session.trackValidationCompletion(Pair.create(this.desc, address), firstTask);
        tasks.add(firstTask);
        ValidationTask currentTask = firstTask;
        while (requests.size() > 0) {
            final InetAddressAndPort nextAddress = (InetAddressAndPort)requests.poll();
            final ValidationTask nextTask = this.newValidationTask(nextAddress, nowInSec);
            tasks.add(nextTask);
            currentTask.addCallback(new FutureCallback<TreeResponse>(){

                @Override
                public void onSuccess(TreeResponse result) {
                    logger.info("{} Validating {}", (Object)((RepairJob)RepairJob.this).session.previewKind.logPrefix(((RepairJob)RepairJob.this).desc.sessionId), (Object)nextAddress);
                    RepairJob.this.session.trackValidationCompletion(Pair.create(RepairJob.this.desc, nextAddress), nextTask);
                    RepairJob.this.taskExecutor.execute(nextTask);
                }

                @Override
                public void onFailure(Throwable t) {
                }
            });
            currentTask = nextTask;
        }
        this.taskExecutor.execute(firstTask);
        return FutureCombiner.allOf(tasks);
    }

    private Future<List<TreeResponse>> sendDCAwareValidationRequest(Collection<InetAddressAndPort> endpoints) {
        this.state.phase.validationSubmitted();
        String message = String.format("Requesting merkle trees for %s (to %s)", this.desc.columnFamily, endpoints);
        logger.info("{} {}", (Object)this.session.previewKind.logPrefix(this.desc.sessionId), (Object)message);
        Tracing.traceRepair(message, new Object[0]);
        int nowInSec = this.getNowInSeconds();
        ArrayList<ValidationTask> tasks = new ArrayList<ValidationTask>(endpoints.size());
        HashMap<String, Queue> requestsByDatacenter = new HashMap<String, Queue>();
        for (InetAddressAndPort inetAddressAndPort : endpoints) {
            String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(inetAddressAndPort);
            Queue queue = requestsByDatacenter.computeIfAbsent(dc, k -> new LinkedList());
            queue.add(inetAddressAndPort);
        }
        for (Map.Entry entry : requestsByDatacenter.entrySet()) {
            Queue requests = (Queue)entry.getValue();
            InetAddressAndPort address = (InetAddressAndPort)requests.poll();
            ValidationTask firstTask = this.newValidationTask(address, nowInSec);
            logger.info("{} Validating {}", (Object)this.session.previewKind.logPrefix(this.session.getId()), (Object)address);
            this.session.trackValidationCompletion(Pair.create(this.desc, address), firstTask);
            tasks.add(firstTask);
            ValidationTask currentTask = firstTask;
            while (requests.size() > 0) {
                final InetAddressAndPort nextAddress = (InetAddressAndPort)requests.poll();
                final ValidationTask nextTask = this.newValidationTask(nextAddress, nowInSec);
                tasks.add(nextTask);
                currentTask.addCallback(new FutureCallback<TreeResponse>(){

                    @Override
                    public void onSuccess(TreeResponse result) {
                        logger.info("{} Validating {}", (Object)((RepairJob)RepairJob.this).session.previewKind.logPrefix(RepairJob.this.session.getId()), (Object)nextAddress);
                        RepairJob.this.session.trackValidationCompletion(Pair.create(RepairJob.this.desc, nextAddress), nextTask);
                        RepairJob.this.taskExecutor.execute(nextTask);
                    }

                    @Override
                    public void onFailure(Throwable t) {
                    }
                });
                currentTask = nextTask;
            }
            this.taskExecutor.execute(firstTask);
        }
        return FutureCombiner.allOf(tasks);
    }

    private ValidationTask newValidationTask(InetAddressAndPort endpoint, int nowInSec) {
        ValidationTask task = new ValidationTask(this.desc, endpoint, nowInSec, this.session.previewKind);
        this.validationTasks.add(task);
        return task;
    }

    private static class NoSuchRepairSessionExceptionWrapper
    extends RuntimeException {
        private final NoSuchRepairSessionException wrapped;

        private NoSuchRepairSessionExceptionWrapper(NoSuchRepairSessionException wrapped) {
            this.wrapped = wrapped;
        }
    }
}

