/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.commitlog;

import com.codahale.metrics.Timer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import net.nmoncho.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
import org.apache.cassandra.concurrent.Interruptible;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.SimpleCachedBufferPool;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractCommitLogSegmentManager {
    static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
    private volatile CommitLogSegment availableSegment = null;
    private final WaitQueue segmentPrepared = WaitQueue.newWaitQueue();
    private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue();
    private volatile CommitLogSegment allocatingFrom = null;
    final String storageDirectory;
    private final AtomicLong size = new AtomicLong();
    @VisibleForTesting
    Interruptible executor;
    protected final CommitLog commitLog;
    private final BooleanSupplier managerThreadWaitCondition = () -> this.availableSegment == null && !this.atSegmentBufferLimit();
    private final WaitQueue managerThreadWaitQueue = WaitQueue.newWaitQueue();
    private volatile SimpleCachedBufferPool bufferPool;

    AbstractCommitLogSegmentManager(CommitLog commitLog, String storageDirectory) {
        this.commitLog = commitLog;
        this.storageDirectory = storageDirectory;
    }

    void start() {
        BufferType bufferType = this.commitLog.configuration.useEncryption() || !this.commitLog.configuration.useCompression() ? BufferType.ON_HEAP : this.commitLog.configuration.getCompressor().preferredBufferType();
        this.bufferPool = new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize(), bufferType);
        AllocatorRunnable allocator = new AllocatorRunnable();
        this.executor = ExecutorFactory.Global.executorFactory().infiniteLoop("COMMIT-LOG-ALLOCATOR", allocator, InfiniteLoopExecutor.SimulatorSafe.SAFE, InfiniteLoopExecutor.Daemon.NON_DAEMON, InfiniteLoopExecutor.Interrupts.SYNCHRONIZED);
        this.advanceAllocatingFrom(null);
    }

    private boolean atSegmentBufferLimit() {
        return CommitLogSegment.usesBufferPool(this.commitLog) && this.bufferPool.atLimit();
    }

    private void maybeFlushToReclaim() {
        long unused = this.unusedCapacity();
        if (unused < 0L) {
            long flushingSize = 0L;
            ArrayList<CommitLogSegment> segmentsToRecycle = new ArrayList<CommitLogSegment>();
            for (CommitLogSegment segment : this.activeSegments) {
                if (segment == this.allocatingFrom) break;
                segmentsToRecycle.add(segment);
                if ((flushingSize += segment.onDiskSize()) + unused < 0L) continue;
                break;
            }
            this.flushDataFrom(segmentsToRecycle, Collections.emptyList(), false);
        }
    }

    public abstract CommitLogSegment.Allocation allocate(Mutation var1, int var2);

    abstract CommitLogSegment createSegment();

    abstract void discard(CommitLogSegment var1, boolean var2);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void advanceAllocatingFrom(CommitLogSegment old) {
        while (true) {
            AbstractCommitLogSegmentManager abstractCommitLogSegmentManager = this;
            synchronized (abstractCommitLogSegmentManager) {
                if (this.allocatingFrom != old) {
                    return;
                }
                if (this.availableSegment != null) {
                    this.allocatingFrom = this.availableSegment;
                    this.activeSegments.add(this.allocatingFrom);
                    this.availableSegment = null;
                    break;
                }
            }
            this.awaitAvailableSegment(old);
        }
        this.wakeManager();
        if (old != null) {
            this.commitLog.archiver.maybeArchive(old);
            old.discardUnusedTail();
        }
        this.commitLog.requestExtraSync();
    }

    void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom) {
        do {
            WaitQueue.Signal prepared = this.segmentPrepared.register(this.commitLog.metrics.waitingOnSegmentAllocation.time(), Timer.Context::stop);
            if (this.availableSegment == null && this.allocatingFrom == currentAllocatingFrom) {
                prepared.awaitUninterruptibly();
                continue;
            }
            prepared.cancel();
        } while (this.availableSegment == null && this.allocatingFrom == currentAllocatingFrom);
    }

    void forceRecycleAll(Collection<TableId> droppedTables) {
        ArrayList<CommitLogSegment> segmentsToRecycle = new ArrayList<CommitLogSegment>(this.activeSegments);
        CommitLogSegment last = (CommitLogSegment)segmentsToRecycle.get(segmentsToRecycle.size() - 1);
        this.advanceAllocatingFrom(last);
        last.waitForModifications();
        Keyspace.writeOrder.awaitNewBarrier();
        Future<?> future = this.flushDataFrom(segmentsToRecycle, droppedTables, true);
        try {
            future.get();
            for (CommitLogSegment segment : this.activeSegments) {
                for (TableId tableId : droppedTables) {
                    segment.markClean(tableId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
                }
            }
            for (CommitLogSegment segment : this.activeSegments) {
                if (!segment.isUnused()) continue;
                this.archiveAndDiscard(segment);
            }
            CommitLogSegment first = this.activeSegments.peek();
            if (first != null && first.id <= last.id) {
                logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
            }
        }
        catch (Throwable t) {
            logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
        }
    }

    void archiveAndDiscard(CommitLogSegment segment) {
        boolean archiveSuccess = this.commitLog.archiver.maybeWaitForArchiving(segment.getName());
        if (!this.activeSegments.remove(segment)) {
            return;
        }
        logger.debug("Segment {} is no longer active and will be deleted {}", (Object)segment, (Object)(archiveSuccess ? "now" : "by the archive script"));
        this.discard(segment, archiveSuccess);
    }

    void handleReplayedSegment(File file) {
        logger.trace("(Unopened) segment {} is no longer needed and will be deleted now", (Object)file);
        FileUtils.deleteWithConfirm(file);
    }

    void addSize(long addedSize) {
        this.size.addAndGet(addedSize);
    }

    public long onDiskSize() {
        return this.size.get();
    }

    private long unusedCapacity() {
        long total = DatabaseDescriptor.getTotalCommitlogSpaceInMiB() * 1024L * 1024L;
        long currentSize = this.size.get();
        logger.trace("Total active commitlog segment space used is {} out of {}", (Object)currentSize, (Object)total);
        return total - currentSize;
    }

    private Future<?> flushDataFrom(List<CommitLogSegment> segments, Collection<TableId> droppedTables, boolean force) {
        if (segments.isEmpty()) {
            return ImmediateFuture.success(null);
        }
        CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition();
        LinkedHashMap<TableId, Future<CommitLogPosition>> flushes = new LinkedHashMap<TableId, Future<CommitLogPosition>>();
        for (CommitLogSegment segment : segments) {
            for (TableId dirtyTableId : segment.getDirtyTableIds()) {
                TableMetadata metadata;
                TableMetadata tableMetadata = metadata = droppedTables.contains(dirtyTableId) ? null : Schema.instance.getTableMetadata(dirtyTableId);
                if (metadata == null) {
                    logger.trace("Marking clean CF {} that doesn't exist anymore", (Object)dirtyTableId);
                    segment.markClean(dirtyTableId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
                    continue;
                }
                if (flushes.containsKey(dirtyTableId)) continue;
                ColumnFamilyStore cfs = Keyspace.open(metadata.keyspace).getColumnFamilyStore(dirtyTableId);
                if (cfs.memtableWritesAreDurable()) {
                    segment.markClean(dirtyTableId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
                    continue;
                }
                flushes.put(dirtyTableId, force ? cfs.forceFlush(ColumnFamilyStore.FlushReason.COMMITLOG_DIRTY) : cfs.forceFlush(maxCommitLogPosition));
            }
        }
        return FutureCombiner.allOf(flushes.values());
    }

    @VisibleForTesting
    public void stopUnsafe(boolean deleteSegments) {
        logger.debug("CLSM closing and clearing existing commit log segments...");
        this.shutdown();
        try {
            assert (this.awaitTermination(5L, TimeUnit.MINUTES)) : "Assert waiting for termination failed on " + FBUtilities.now().toString();
        }
        catch (InterruptedException e) {
            throw new UncheckedInterruptedException(e);
        }
        for (CommitLogSegment segment : this.activeSegments) {
            this.closeAndDeleteSegmentUnsafe(segment, deleteSegments);
        }
        this.activeSegments.clear();
        this.size.set(0L);
        logger.trace("CLSM done with closing and clearing existing commit log segments.");
    }

    @VisibleForTesting
    public void awaitManagementTasksCompletion() {
        if (this.availableSegment == null && !this.atSegmentBufferLimit()) {
            this.awaitAvailableSegment(this.allocatingFrom);
        }
    }

    private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete) {
        try {
            this.discard(segment, delete);
        }
        catch (AssertionError assertionError) {
            // empty catch block
        }
    }

    public void shutdown() {
        this.executor.shutdownNow();
        this.discardAvailableSegment();
        this.wakeManager();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void discardAvailableSegment() {
        CommitLogSegment next;
        AbstractCommitLogSegmentManager abstractCommitLogSegmentManager = this;
        synchronized (abstractCommitLogSegmentManager) {
            next = this.availableSegment;
            this.availableSegment = null;
        }
        if (next != null) {
            next.discard(true);
        }
    }

    public boolean awaitTermination(long timeout, TimeUnit units) throws InterruptedException {
        boolean res = this.executor.awaitTermination(timeout, units);
        for (CommitLogSegment segment : this.activeSegments) {
            segment.close();
        }
        if (this.bufferPool != null) {
            this.bufferPool.emptyBufferPool();
        }
        return res;
    }

    @VisibleForTesting
    public Collection<CommitLogSegment> getActiveSegments() {
        return Collections.unmodifiableCollection(this.activeSegments);
    }

    CommitLogPosition getCurrentPosition() {
        return this.allocatingFrom.getCurrentCommitLogPosition();
    }

    public void sync(boolean flush) throws IOException {
        CommitLogSegment current = this.allocatingFrom;
        for (CommitLogSegment segment : this.getActiveSegments()) {
            if (segment.id > current.id) {
                return;
            }
            segment.sync(flush);
        }
    }

    SimpleCachedBufferPool getBufferPool() {
        return this.bufferPool;
    }

    void wakeManager() {
        this.managerThreadWaitQueue.signalAll();
    }

    void notifyBufferFreed() {
        this.wakeManager();
    }

    CommitLogSegment allocatingFrom() {
        return this.allocatingFrom;
    }

    class AllocatorRunnable
    implements Interruptible.Task {
        AllocatorRunnable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run(Interruptible.State state) throws InterruptedException {
            boolean interrupted = false;
            try {
                switch (state) {
                    case SHUTTING_DOWN: {
                        AbstractCommitLogSegmentManager.this.discardAvailableSegment();
                        return;
                    }
                    case NORMAL: {
                        assert (AbstractCommitLogSegmentManager.this.availableSegment == null);
                        AllocatorRunnable allocatorRunnable = this;
                        synchronized (allocatorRunnable) {
                            interrupted = Thread.interrupted();
                            logger.trace("No segments in reserve; creating a fresh one");
                            AbstractCommitLogSegmentManager.this.availableSegment = AbstractCommitLogSegmentManager.this.createSegment();
                            AbstractCommitLogSegmentManager.this.segmentPrepared.signalAll();
                            Thread.yield();
                            if (AbstractCommitLogSegmentManager.this.availableSegment == null && !AbstractCommitLogSegmentManager.this.atSegmentBufferLimit()) {
                                return;
                            }
                            AbstractCommitLogSegmentManager.this.maybeFlushToReclaim();
                            break;
                        }
                    }
                }
            }
            catch (Throwable t) {
                if (!CommitLog.handleCommitError("Failed managing commit log segments", t)) {
                    AbstractCommitLogSegmentManager.this.discardAvailableSegment();
                    throw new Interruptible.TerminateException();
                }
                Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
            }
            boolean bl = interrupted = interrupted || Thread.interrupted();
            if (!interrupted) {
                try {
                    WaitQueue.waitOnCondition(AbstractCommitLogSegmentManager.this.managerThreadWaitCondition, AbstractCommitLogSegmentManager.this.managerThreadWaitQueue);
                }
                catch (InterruptedException e) {
                    interrupted = true;
                }
            }
            if (interrupted) {
                AbstractCommitLogSegmentManager.this.discardAvailableSegment();
                throw new InterruptedException();
            }
        }
    }
}

