/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.commitlog;

import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import net.nmoncho.shaded.com.google.common.annotations.VisibleForTesting;
import net.nmoncho.shaded.com.google.common.base.Preconditions;
import net.nmoncho.shaded.com.google.common.util.concurrent.RateLimiter;
import net.nmoncho.shaded.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.commitlog.AbstractCommitLogSegmentManager;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.exceptions.CDCWriteException;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.DirectorySizeCalculator;
import org.apache.cassandra.utils.NoSpamLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitLogSegmentManagerCDC
extends AbstractCommitLogSegmentManager {
    static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManagerCDC.class);
    private final CDCSizeTracker cdcSizeTracker = new CDCSizeTracker(this, new File(DatabaseDescriptor.getCDCLogLocation()));

    public CommitLogSegmentManagerCDC(CommitLog commitLog, String storageDirectory) {
        super(commitLog, storageDirectory);
    }

    @Override
    void start() {
        this.cdcSizeTracker.start();
        super.start();
    }

    @Override
    public void discard(CommitLogSegment segment, boolean delete) {
        segment.close();
        this.addSize(-segment.onDiskSize());
        this.cdcSizeTracker.processDiscardedSegment(segment);
        if (delete) {
            segment.logFile.delete();
        }
        if (segment.getCDCState() != CommitLogSegment.CDCState.CONTAINS) {
            File cdcLink = segment.getCDCFile();
            File cdcIndexFile = segment.getCDCIndexFile();
            this.deleteCDCFiles(cdcLink, cdcIndexFile);
        }
    }

    public long deleteOldLinkedCDCCommitLogSegment(long bytesToFree) {
        if (bytesToFree <= 0L) {
            return 0L;
        }
        File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
        Preconditions.checkState(cdcDir.isDirectory(), "The CDC directory does not exist.");
        File[] files = cdcDir.tryList(f -> CommitLogDescriptor.isValid(f.name()));
        if (files == null || files.length == 0) {
            logger.warn("Skip deleting due to no CDC commit log segments found.");
            return 0L;
        }
        List sorted = Arrays.stream(files).sorted(new CommitLogSegment.CommitLogSegmentFileComparator()).collect(Collectors.toList());
        long bytesDeleted = 0L;
        long bytesRemaining = 0L;
        boolean deletionCompleted = false;
        for (File linkedCdcFile : sorted) {
            if (!deletionCompleted) {
                boolean bl = deletionCompleted = bytesDeleted >= bytesToFree || linkedCdcFile.equals(this.allocatingFrom().getCDCFile());
            }
            if (deletionCompleted) {
                bytesRemaining += linkedCdcFile.length();
                continue;
            }
            File cdcIndexFile = CommitLogDescriptor.inferCdcIndexFile(linkedCdcFile);
            bytesDeleted += this.deleteCDCFiles(linkedCdcFile, cdcIndexFile);
        }
        return bytesRemaining;
    }

    private long deleteCDCFiles(File cdcLink, File cdcIndexFile) {
        long total = 0L;
        if (cdcLink != null && cdcLink.exists()) {
            total += cdcLink.length();
            cdcLink.delete();
        }
        if (cdcIndexFile != null && cdcIndexFile.exists()) {
            total += cdcIndexFile.length();
            cdcIndexFile.delete();
        }
        return total;
    }

    @Override
    public void shutdown() {
        this.cdcSizeTracker.shutdown();
        super.shutdown();
    }

    @Override
    public CommitLogSegment.Allocation allocate(Mutation mutation, int size) throws CDCWriteException {
        CommitLogSegment.Allocation alloc;
        CommitLogSegment segment = this.allocatingFrom();
        this.permitSegmentMaybe(segment);
        this.throwIfForbidden(mutation, segment);
        while (null == (alloc = segment.allocate(mutation, size))) {
            this.advanceAllocatingFrom(segment);
            segment = this.allocatingFrom();
            this.permitSegmentMaybe(segment);
            this.throwIfForbidden(mutation, segment);
        }
        if (mutation.trackedByCDC()) {
            segment.setCDCState(CommitLogSegment.CDCState.CONTAINS);
        }
        return alloc;
    }

    private void permitSegmentMaybe(CommitLogSegment segment) {
        CommitLogSegment.CDCState oldState;
        if (segment.getCDCState() != CommitLogSegment.CDCState.FORBIDDEN) {
            return;
        }
        if (!(DatabaseDescriptor.getCDCBlockWrites() && this.cdcSizeTracker.sizeInProgress.get() + (long)DatabaseDescriptor.getCommitLogSegmentSize() >= DatabaseDescriptor.getCDCTotalSpace() || (oldState = segment.setCDCState(CommitLogSegment.CDCState.PERMITTED)) != CommitLogSegment.CDCState.FORBIDDEN)) {
            FileUtils.createHardLink(segment.logFile, segment.getCDCFile());
            this.cdcSizeTracker.addSize(DatabaseDescriptor.getCommitLogSegmentSize());
        }
    }

    private void throwIfForbidden(Mutation mutation, CommitLogSegment segment) throws CDCWriteException {
        if (mutation.trackedByCDC() && segment.getCDCState() == CommitLogSegment.CDCState.FORBIDDEN) {
            this.cdcSizeTracker.submitOverflowSizeRecalculation();
            String logMsg = String.format("Rejecting mutation to keyspace %s. Free up space in %s by processing CDC logs. Total CDC bytes on disk is %s.", mutation.getKeyspaceName(), DatabaseDescriptor.getCDCLogLocation(), this.cdcSizeTracker.sizeInProgress.get());
            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 10L, TimeUnit.SECONDS, logMsg, new Object[0]);
            throw new CDCWriteException(logMsg);
        }
    }

    @Override
    public CommitLogSegment createSegment() {
        CommitLogSegment segment = CommitLogSegment.createSegment(this.commitLog, this);
        this.cdcSizeTracker.processNewSegment(segment);
        if (segment.getCDCState() == CommitLogSegment.CDCState.PERMITTED) {
            FileUtils.createHardLink(segment.logFile, segment.getCDCFile());
        }
        return segment;
    }

    @Override
    void handleReplayedSegment(File file) {
        super.handleReplayedSegment(file);
        File cdcFile = new File(DatabaseDescriptor.getCDCLogLocation(), file.name());
        File cdcIndexFile = new File(DatabaseDescriptor.getCDCLogLocation(), CommitLogDescriptor.fromFileName(file.name()).cdcIndexFileName());
        if (cdcFile.exists() && !cdcIndexFile.exists()) {
            logger.trace("(Unopened) CDC segment {} is no longer needed and will be deleted now", (Object)cdcFile);
            cdcFile.delete();
        }
    }

    public void addCDCSize(long size) {
        this.cdcSizeTracker.addSize(size);
    }

    @VisibleForTesting
    public long updateCDCTotalSize() {
        long sleepTime = (long)DatabaseDescriptor.getCDCDiskCheckInterval() + 50L;
        Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
        this.cdcSizeTracker.submitOverflowSizeRecalculation();
        Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
        if (this.allocatingFrom().getCDCState() == CommitLogSegment.CDCState.FORBIDDEN) {
            this.cdcSizeTracker.processNewSegment(this.allocatingFrom());
        }
        return this.cdcSizeTracker.getAllocatedSize();
    }

    private static class CDCSizeTracker
    extends DirectorySizeCalculator {
        private final RateLimiter rateLimiter = RateLimiter.create(1000.0 / (double)DatabaseDescriptor.getCDCDiskCheckInterval());
        private ExecutorService cdcSizeCalculationExecutor;
        private final CommitLogSegmentManagerCDC segmentManager;
        private final AtomicLong sizeInProgress;
        private final File path;

        CDCSizeTracker(CommitLogSegmentManagerCDC segmentManager, File path) {
            this.path = path;
            this.segmentManager = segmentManager;
            this.sizeInProgress = new AtomicLong(0L);
        }

        public void start() {
            this.sizeInProgress.getAndSet(0L);
            this.cdcSizeCalculationExecutor = ExecutorFactory.Global.executorFactory().configureSequential("CDCSizeCalculationExecutor").withRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()).withQueueLimit(0).withKeepAlive(1000L, TimeUnit.SECONDS).build();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void processNewSegment(CommitLogSegment segment) {
            int segmentSize = this.defaultSegmentSize();
            long allowance = DatabaseDescriptor.getCDCTotalSpace();
            boolean blocking = DatabaseDescriptor.getCDCBlockWrites();
            Object object = segment.cdcStateLock;
            synchronized (object) {
                segment.setCDCState(blocking && (long)segmentSize + this.sizeInProgress.get() > allowance ? CommitLogSegment.CDCState.FORBIDDEN : CommitLogSegment.CDCState.PERMITTED);
                if (segment.getCDCState() == CommitLogSegment.CDCState.PERMITTED) {
                    this.addSize(segmentSize);
                }
            }
            if (!blocking && this.sizeInProgress.get() > allowance) {
                long bytesToFree = this.sizeInProgress.get() - allowance;
                long remainingSize = this.segmentManager.deleteOldLinkedCDCCommitLogSegment(bytesToFree);
                long releasedSize = this.sizeInProgress.get() - remainingSize;
                this.sizeInProgress.getAndSet(remainingSize);
                logger.debug("Freed up {} ({}) bytes after deleting the oldest CDC commit log segments in non-blocking mode. Total on-disk CDC size: {}; allowed CDC size: {}", new Object[]{releasedSize, bytesToFree, remainingSize, allowance});
            }
            this.submitOverflowSizeRecalculation();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void processDiscardedSegment(CommitLogSegment segment) {
            if (!segment.getCDCFile().exists()) {
                logger.debug("Not processing discarded CommitLogSegment {}; this segment appears to have been deleted already.", (Object)segment);
                return;
            }
            Object object = segment.cdcStateLock;
            synchronized (object) {
                if (segment.getCDCState() == CommitLogSegment.CDCState.CONTAINS) {
                    this.addSize(segment.onDiskSize());
                }
                if (segment.getCDCState() != CommitLogSegment.CDCState.FORBIDDEN) {
                    this.addSize(-this.defaultSegmentSize());
                }
            }
            this.submitOverflowSizeRecalculation();
        }

        public void submitOverflowSizeRecalculation() {
            try {
                this.cdcSizeCalculationExecutor.submit(() -> {
                    this.rateLimiter.acquire();
                    this.calculateSize();
                });
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }

        private int defaultSegmentSize() {
            return DatabaseDescriptor.getCommitLogSegmentSize();
        }

        private void calculateSize() {
            try {
                this.resetSize();
                Files.walkFileTree(this.path.toPath(), this);
                this.sizeInProgress.getAndSet(this.getAllocatedSize());
            }
            catch (IOException ie) {
                CommitLog.handleCommitError("Failed CDC Size Calculation", ie);
            }
        }

        public void shutdown() {
            if (this.cdcSizeCalculationExecutor != null && !this.cdcSizeCalculationExecutor.isShutdown()) {
                this.cdcSizeCalculationExecutor.shutdown();
            }
        }

        private void addSize(long toAdd) {
            this.sizeInProgress.getAndAdd(toAdd);
        }
    }
}

