package com.microsoft.azure.cosmos.connectors.cassandra.filewatcher;

import com.google.common.base.Preconditions;
import com.microsoft.azure.cosmos.connectors.cassandra.commitlogprocessor.v6.CommitLogSegmentProcessor;
import com.microsoft.azure.cosmos.connectors.cassandra.config.CommitLogUploadConfig;
import java.io.File;
import java.io.IOException;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/microsoft/azure/cosmos/connectors/cassandra/filewatcher/ActiveCommitLogProcessor.class */
public class ActiveCommitLogProcessor {
    private final Logger logger = LoggerFactory.getLogger((Class<?>) ActiveCommitLogProcessor.class);
    private final ScheduledExecutorService scheduler;
    private final CommitLogSegmentProcessor commitLogSegmentProcessor;
    private final CommitLogUploadConfig commitLogUploadConfig;
    private final Path commitLogDir;
    private final CommitLogSegmentOffsetCache commitlogSegmentOffsetCache;
    private final Path commitLogSegmentWriteDir;
    private volatile String lastActiveCommitLog;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ActiveCommitLogProcessor(String str, CommitLogUploadConfig commitLogUploadConfig, CommitLogSegmentOffsetCache commitLogSegmentOffsetCache, boolean z) throws InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException, MalformedObjectNameException {
        Preconditions.checkNotNull(commitLogUploadConfig, "commitLogUploadConfig");
        Preconditions.checkArgument(commitLogUploadConfig.isEnableOffsetProcessing(), "enableOffsetProcessing");
        Preconditions.checkArgument(StringUtils.isNotEmpty(str), "commitLogDir");
        String commitLogSegmentProcessorWriteDir = commitLogUploadConfig.getCommitLogSegmentProcessorWriteDir();
        Preconditions.checkArgument(StringUtils.isNotEmpty(commitLogSegmentProcessorWriteDir), "commitLogSegmentProcessorWriteDir");
        this.scheduler = Executors.newScheduledThreadPool(1);
        this.commitLogSegmentProcessor = new CommitLogSegmentProcessor(commitLogUploadConfig.getTmpCommitLogSegmentProcessorWriteDir(), z);
        this.commitLogUploadConfig = commitLogUploadConfig;
        this.commitLogDir = Paths.get(str, new String[0]);
        this.commitlogSegmentOffsetCache = commitLogSegmentOffsetCache;
        this.commitLogSegmentWriteDir = Paths.get(commitLogSegmentProcessorWriteDir, new String[0]);
    }

    public void start() {
        registerCommitLogProcessorTask();
    }

    private void registerCommitLogProcessorTask() {
        this.scheduler.scheduleAtFixedRate(() -> {
            try {
                Path findActiveCommitLog = findActiveCommitLog();
                if (findActiveCommitLog == null) {
                    this.logger.error("Couldn't find current active segment");
                    return;
                }
                String path = findActiveCommitLog.getFileName().toString();
                if (!StringUtils.equals(this.lastActiveCommitLog, path)) {
                    this.logger.debug("active commitlog changed, passing this round");
                    this.lastActiveCommitLog = path;
                    return;
                }
                Integer currentOffset = this.commitlogSegmentOffsetCache.getCurrentOffset(path);
                if (currentOffset == null) {
                    currentOffset = 0;
                }
                Pair<Integer, File> processSegment = this.commitLogSegmentProcessor.processSegment(findActiveCommitLog.toFile(), currentOffset.intValue());
                int intValue = processSegment.getLeft().intValue();
                File right = processSegment.getRight();
                this.logger.info("Completed Processing Commit Log Segment [{}] at offset: [{}], new offset is [{}], new mutations written to file [{}]", findActiveCommitLog, currentOffset, Integer.valueOf(intValue), right);
                if (!$assertionsDisabled && intValue != currentOffset.intValue() && right == null) {
                    throw new AssertionError();
                }
                if (right != null) {
                    Path path2 = right.toPath();
                    Path path3 = Paths.get(this.commitLogSegmentWriteDir.toString(), right.getName());
                    this.logger.info("Moving commit log Segment from [{}] to [{}]", path2, path3);
                    Files.move(path2, path3, new CopyOption[0]);
                }
                this.commitlogSegmentOffsetCache.setOffset(findActiveCommitLog.getFileName().getFileName().toString(), intValue);
            } catch (IOException e) {
                this.logger.error("Failed processing commit log segment {}", (Object) null, e);
            } catch (Exception e2) {
                this.logger.error("Unexpected failure in processing commitLog segment {}", (Object) null, e2);
            }
        }, this.commitLogUploadConfig.getMaxInactivityTimeInMillis(), this.commitLogUploadConfig.getMaxInactivityTimeInMillis(), TimeUnit.MILLISECONDS);
    }

    private Path findActiveCommitLog() throws Exception {
        CommitLogTimestampComparatorUsingPath commitLogTimestampComparatorUsingPath = new CommitLogTimestampComparatorUsingPath();
        Stream<Path> list = Files.list(this.commitLogDir);
        Throwable th = null;
        try {
            try {
                Path path = (Path) Utils.findSecondMaxOrNull((List) list.collect(Collectors.toList()), commitLogTimestampComparatorUsingPath);
                if (list != null) {
                    if (0 != 0) {
                        try {
                            list.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        list.close();
                    }
                }
                return path;
            } finally {
            }
        } catch (Throwable th3) {
            if (list != null) {
                if (th != null) {
                    try {
                        list.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    list.close();
                }
            }
            throw th3;
        }
    }

    public void shutdown() {
        this.scheduler.shutdown();
    }

    static {
        $assertionsDisabled = !ActiveCommitLogProcessor.class.desiredAssertionStatus();
    }
}
